Skip to content

Commit f0708ff

Browse files
committed
fix(turbopack-node): async loop running in loaders worker thread (#76)
1 parent 97827d5 commit f0708ff

File tree

6 files changed

+114
-194
lines changed

6 files changed

+114
-194
lines changed

turbopack/crates/turbopack-node/js/src/transforms/webpack-loaders.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ declare const __turbopack_external_require__: {
33
} & ((id: string, thunk: () => any, esm?: boolean) => any)
44

55
import type { Channel as Ipc } from '../types'
6-
import { dirname, resolve as pathResolve, relative } from 'path'
6+
import { resolve as pathResolve, relative } from 'path'
77
import {
88
StackFrame,
99
parse as parseStackTrace,

turbopack/crates/turbopack-node/js/src/web_worker/evaluate.ts

Lines changed: 26 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { Binding, TaskChannel } from '../worker_threads/taskChannel'
12
import { structuredError } from '../error'
23
import type { Channel } from '../types'
34

@@ -16,122 +17,53 @@ export declare const self: Self
1617
// @ts-ignore
1718
const { workerId, poolId } = self.workerData
1819

19-
interface Binding {
20-
recvWorkerRequest(poolId: string): Promise<number>
21-
recvMessageInWorker(workerId: number): Promise<string>
22-
notifyWorkerAck(taskId: number, workerId: number): Promise<void>
23-
sendTaskMessage(taskId: number, message: string): Promise<void>
24-
}
25-
2620
let binding: Binding = self.workerData.binding
2721

28-
const queue: string[][] = []
29-
3022
export const run = async (
3123
moduleFactory: () => Promise<{
3224
init?: () => Promise<void>
3325
default: (channel: Channel<any, any>, ...deserializedArgs: any[]) => any
3426
}>
3527
) => {
36-
const taskId = await binding.recvWorkerRequest(poolId)
28+
let getValue: (channel: Channel<any, any>, ...deserializedArgs: any[]) => any
3729

38-
await binding.notifyWorkerAck(taskId, workerId)
30+
let isRunning = false
3931

40-
let nextId = 1
41-
const requests = new Map()
42-
const internalIpc = {
43-
sendInfo: (message: any) =>
44-
binding.sendTaskMessage(
32+
const run = async (taskId: number, args: string[]) => {
33+
try {
34+
if (typeof getValue !== 'function') {
35+
const module = await moduleFactory()
36+
if (typeof module.init === 'function') {
37+
await module.init()
38+
}
39+
getValue = module.default
40+
}
41+
const value = await getValue(new TaskChannel(binding, taskId), ...args)
42+
await binding.sendTaskMessage(
4543
taskId,
4644
JSON.stringify({
47-
type: 'info',
48-
data: message,
45+
type: 'end',
46+
data: value === undefined ? undefined : JSON.stringify(value),
47+
duration: 0,
4948
})
50-
),
51-
sendRequest: async (message: any) => {
52-
const id = nextId++
53-
let resolve, reject
54-
const promise = new Promise((res, rej) => {
55-
resolve = res
56-
reject = rej
57-
})
58-
requests.set(id, { resolve, reject })
59-
return binding
60-
.sendTaskMessage(
61-
taskId,
62-
JSON.stringify({ type: 'request', id, data: message })
63-
)
64-
.then(() => promise)
65-
},
66-
sendError: async (error: Error) => {
67-
try {
68-
await binding.sendTaskMessage(
69-
taskId,
70-
JSON.stringify({
71-
type: 'error',
72-
...structuredError(error),
73-
})
74-
)
75-
} catch (err) {
76-
// There's nothing we can do about errors that happen after this point, we can't tell anyone
77-
// about them.
78-
console.error('failed to send error back to rust:', err)
79-
}
80-
},
81-
}
82-
83-
let getValue: (channel: Channel<any, any>, ...deserializedArgs: any[]) => any
84-
try {
85-
const module = await moduleFactory()
86-
if (typeof module.init === 'function') {
87-
await module.init()
88-
}
89-
getValue = module.default
90-
} catch (err) {
91-
try {
49+
)
50+
} catch (err) {
9251
await binding.sendTaskMessage(
9352
taskId,
9453
JSON.stringify({
9554
type: 'error',
9655
...structuredError(err as Error),
9756
})
9857
)
99-
} catch (err) {
100-
// There's nothing we can do about errors that happen after this point, we can't tell anyone
101-
// about them.
102-
console.error('failed to send error back to rust:', err)
103-
}
104-
}
105-
106-
let isRunning = false
107-
108-
const run = async () => {
109-
while (queue.length > 0) {
110-
const args = queue.shift()!
111-
try {
112-
const value = await getValue(internalIpc, ...args)
113-
await binding.sendTaskMessage(
114-
taskId,
115-
JSON.stringify({
116-
type: 'end',
117-
data: value === undefined ? undefined : JSON.stringify(value),
118-
duration: 0,
119-
})
120-
)
121-
} catch (err) {
122-
await binding.sendTaskMessage(
123-
taskId,
124-
JSON.stringify({
125-
type: 'error',
126-
...structuredError(err as Error),
127-
})
128-
)
129-
}
13058
}
13159
isRunning = false
13260
}
13361

13462
while (true) {
63+
const taskId = await binding.recvWorkerRequest(poolId)
64+
65+
await binding.notifyWorkerAck(taskId, workerId)
66+
13567
const msg_str = await binding.recvMessageInWorker(workerId)
13668

13769
const msg = JSON.parse(msg_str) as
@@ -148,17 +80,16 @@ export const run = async (
14880

14981
switch (msg.type) {
15082
case 'evaluate': {
151-
queue.push(msg.args)
15283
if (!isRunning) {
15384
isRunning = true
154-
run()
85+
run(taskId, msg.args)
15586
}
15687
break
15788
}
15889
case 'result': {
159-
const request = requests.get(msg.id)
90+
const request = TaskChannel.requests.get(msg.id)
16091
if (request) {
161-
requests.delete(msg.id)
92+
TaskChannel.requests.delete(msg.id)
16293
if (msg.error) {
16394
// Need to reject at next macro task queue, because some rejection callbacks is not registered when executing to here,
16495
// that will cause the error be propergated to schedule thread, then causing panic.

turbopack/crates/turbopack-node/js/src/worker_threads/evaluate.ts

Lines changed: 25 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -1,125 +1,55 @@
11
import { threadId as workerId, workerData } from 'worker_threads'
22
import { structuredError } from '../error'
33
import type { Channel } from '../types'
4-
5-
interface Binding {
6-
recvWorkerRequest(poolId: string): Promise<number>
7-
recvMessageInWorker(workerId: number): Promise<string>
8-
notifyWorkerAck(taskId: number, workerId: number): Promise<void>
9-
sendTaskMessage(taskId: number, message: string): Promise<void>
10-
}
4+
import { Binding, TaskChannel } from './taskChannel'
115

126
const binding: Binding = require(
137
/* turbopackIgnore: true */ workerData.bindingPath
148
)
15-
16-
const queue: string[][] = []
17-
189
export const run = async (
1910
moduleFactory: () => Promise<{
2011
init?: () => Promise<void>
2112
default: (channel: Channel<any, any>, ...deserializedArgs: any[]) => any
2213
}>
2314
) => {
24-
const taskId = await binding.recvWorkerRequest(workerData.poolId)
15+
let getValue: (channel: Channel<any, any>, ...deserializedArgs: any[]) => any
2516

26-
await binding.notifyWorkerAck(taskId, workerId)
17+
let isRunning = false
2718

28-
let nextId = 1
29-
const requests = new Map()
30-
const internalIpc = {
31-
sendInfo: (message: any) =>
32-
binding.sendTaskMessage(
19+
const run = async (taskId: number, args: string[]) => {
20+
try {
21+
if (typeof getValue !== 'function') {
22+
const module = await moduleFactory()
23+
if (typeof module.init === 'function') {
24+
await module.init()
25+
}
26+
getValue = module.default
27+
}
28+
const value = await getValue(new TaskChannel(binding, taskId), ...args)
29+
await binding.sendTaskMessage(
3330
taskId,
3431
JSON.stringify({
35-
type: 'info',
36-
data: message,
32+
type: 'end',
33+
data: value === undefined ? undefined : JSON.stringify(value),
34+
duration: 0,
3735
})
38-
),
39-
sendRequest: async (message: any) => {
40-
const id = nextId++
41-
let resolve, reject
42-
const promise = new Promise((res, rej) => {
43-
resolve = res
44-
reject = rej
45-
})
46-
requests.set(id, { resolve, reject })
47-
return binding
48-
.sendTaskMessage(
49-
taskId,
50-
JSON.stringify({ type: 'request', id, data: message })
51-
)
52-
.then(() => promise)
53-
},
54-
sendError: async (error: Error) => {
55-
try {
56-
await binding.sendTaskMessage(
57-
taskId,
58-
JSON.stringify({
59-
type: 'error',
60-
...structuredError(error),
61-
})
62-
)
63-
} catch (err) {
64-
// There's nothing we can do about errors that happen after this point, we can't tell anyone
65-
// about them.
66-
console.error('failed to send error back to rust:', err)
67-
}
68-
},
69-
}
70-
71-
let getValue: (channel: Channel<any, any>, ...deserializedArgs: any[]) => any
72-
try {
73-
const module = await moduleFactory()
74-
if (typeof module.init === 'function') {
75-
await module.init()
76-
}
77-
getValue = module.default
78-
} catch (err) {
79-
try {
36+
)
37+
} catch (err) {
8038
await binding.sendTaskMessage(
8139
taskId,
8240
JSON.stringify({
8341
type: 'error',
8442
...structuredError(err as Error),
8543
})
8644
)
87-
} catch (err) {
88-
// There's nothing we can do about errors that happen after this point, we can't tell anyone
89-
// about them.
90-
console.error('failed to send error back to rust:', err)
91-
}
92-
}
93-
94-
let isRunning = false
95-
96-
const run = async () => {
97-
while (queue.length > 0) {
98-
const args = queue.shift()!
99-
try {
100-
const value = await getValue(internalIpc, ...args)
101-
await binding.sendTaskMessage(
102-
taskId,
103-
JSON.stringify({
104-
type: 'end',
105-
data: value === undefined ? undefined : JSON.stringify(value),
106-
duration: 0,
107-
})
108-
)
109-
} catch (err) {
110-
await binding.sendTaskMessage(
111-
taskId,
112-
JSON.stringify({
113-
type: 'error',
114-
...structuredError(err as Error),
115-
})
116-
)
117-
}
11845
}
11946
isRunning = false
12047
}
12148

12249
while (true) {
50+
const taskId = await binding.recvWorkerRequest(workerData.poolId)
51+
52+
await binding.notifyWorkerAck(taskId, workerId)
12353
const msg_str = await binding.recvMessageInWorker(workerId)
12454

12555
const msg = JSON.parse(msg_str) as
@@ -136,17 +66,16 @@ export const run = async (
13666

13767
switch (msg.type) {
13868
case 'evaluate': {
139-
queue.push(msg.args)
14069
if (!isRunning) {
14170
isRunning = true
142-
run()
71+
run(taskId, msg.args)
14372
}
14473
break
14574
}
14675
case 'result': {
147-
const request = requests.get(msg.id)
76+
const request = TaskChannel.requests.get(msg.id)
14877
if (request) {
149-
requests.delete(msg.id)
78+
TaskChannel.requests.delete(msg.id)
15079
if (msg.error) {
15180
// Need to reject at next macro task queue, because some rejection callbacks is not registered when executing to here,
15281
// that will cause the error be propergated to schedule thread, then causing panic.

0 commit comments

Comments
 (0)