Skip to content

Commit 454df69

Browse files
committed
fix(turbopack-node): async loop running in loaders worker thread
1 parent 97827d5 commit 454df69

File tree

6 files changed

+152
-171
lines changed

6 files changed

+152
-171
lines changed

turbopack/crates/turbopack-node/js/src/transforms/webpack-loaders.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ declare const __turbopack_external_require__: {
33
} & ((id: string, thunk: () => any, esm?: boolean) => any)
44

55
import type { Channel as Ipc } from '../types'
6-
import { dirname, resolve as pathResolve, relative } from 'path'
6+
import { resolve as pathResolve, relative } from 'path'
77
import {
88
StackFrame,
99
parse as parseStackTrace,

turbopack/crates/turbopack-node/js/src/web_worker/evaluate.ts

Lines changed: 69 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -25,75 +25,45 @@ interface Binding {
2525

2626
let binding: Binding = self.workerData.binding
2727

28-
const queue: string[][] = []
28+
let nextId = 1
29+
const requests = new Map()
2930

30-
export const run = async (
31-
moduleFactory: () => Promise<{
32-
init?: () => Promise<void>
33-
default: (channel: Channel<any, any>, ...deserializedArgs: any[]) => any
34-
}>
35-
) => {
36-
const taskId = await binding.recvWorkerRequest(poolId)
31+
class InternalIpc {
32+
constructor(private taskId: number) {}
3733

38-
await binding.notifyWorkerAck(taskId, workerId)
39-
40-
let nextId = 1
41-
const requests = new Map()
42-
const internalIpc = {
43-
sendInfo: (message: any) =>
44-
binding.sendTaskMessage(
45-
taskId,
46-
JSON.stringify({
47-
type: 'info',
48-
data: message,
49-
})
50-
),
51-
sendRequest: async (message: any) => {
52-
const id = nextId++
53-
let resolve, reject
54-
const promise = new Promise((res, rej) => {
55-
resolve = res
56-
reject = rej
34+
async sendInfo(message: any) {
35+
return await binding.sendTaskMessage(
36+
this.taskId,
37+
JSON.stringify({
38+
type: 'info',
39+
data: message,
5740
})
58-
requests.set(id, { resolve, reject })
59-
return binding
60-
.sendTaskMessage(
61-
taskId,
62-
JSON.stringify({ type: 'request', id, data: message })
63-
)
64-
.then(() => promise)
65-
},
66-
sendError: async (error: Error) => {
67-
try {
68-
await binding.sendTaskMessage(
69-
taskId,
70-
JSON.stringify({
71-
type: 'error',
72-
...structuredError(error),
73-
})
74-
)
75-
} catch (err) {
76-
// There's nothing we can do about errors that happen after this point, we can't tell anyone
77-
// about them.
78-
console.error('failed to send error back to rust:', err)
79-
}
80-
},
41+
)
8142
}
8243

83-
let getValue: (channel: Channel<any, any>, ...deserializedArgs: any[]) => any
84-
try {
85-
const module = await moduleFactory()
86-
if (typeof module.init === 'function') {
87-
await module.init()
88-
}
89-
getValue = module.default
90-
} catch (err) {
44+
async sendRequest(message: any) {
45+
const id = nextId++
46+
let resolve, reject
47+
const promise = new Promise((res, rej) => {
48+
resolve = res
49+
reject = rej
50+
})
51+
requests.set(id, { resolve, reject })
52+
return await binding
53+
.sendTaskMessage(
54+
this.taskId,
55+
JSON.stringify({ type: 'request', id, data: message })
56+
)
57+
.then(() => promise)
58+
}
59+
60+
async sendError(error: Error) {
9161
try {
9262
await binding.sendTaskMessage(
93-
taskId,
63+
this.taskId,
9464
JSON.stringify({
9565
type: 'error',
96-
...structuredError(err as Error),
66+
...structuredError(error),
9767
})
9868
)
9969
} catch (err) {
@@ -102,36 +72,53 @@ export const run = async (
10272
console.error('failed to send error back to rust:', err)
10373
}
10474
}
75+
}
76+
77+
export const run = async (
78+
moduleFactory: () => Promise<{
79+
init?: () => Promise<void>
80+
default: (channel: Channel<any, any>, ...deserializedArgs: any[]) => any
81+
}>
82+
) => {
83+
let getValue: (channel: Channel<any, any>, ...deserializedArgs: any[]) => any
10584

10685
let isRunning = false
10786

108-
const run = async () => {
109-
while (queue.length > 0) {
110-
const args = queue.shift()!
111-
try {
112-
const value = await getValue(internalIpc, ...args)
113-
await binding.sendTaskMessage(
114-
taskId,
115-
JSON.stringify({
116-
type: 'end',
117-
data: value === undefined ? undefined : JSON.stringify(value),
118-
duration: 0,
119-
})
120-
)
121-
} catch (err) {
122-
await binding.sendTaskMessage(
123-
taskId,
124-
JSON.stringify({
125-
type: 'error',
126-
...structuredError(err as Error),
127-
})
128-
)
87+
const run = async (taskId: number, args: string[]) => {
88+
try {
89+
if (typeof getValue !== 'function') {
90+
const module = await moduleFactory()
91+
if (typeof module.init === 'function') {
92+
await module.init()
93+
}
94+
getValue = module.default
12995
}
96+
const value = await getValue(new InternalIpc(taskId), ...args)
97+
await binding.sendTaskMessage(
98+
taskId,
99+
JSON.stringify({
100+
type: 'end',
101+
data: value === undefined ? undefined : JSON.stringify(value),
102+
duration: 0,
103+
})
104+
)
105+
} catch (err) {
106+
await binding.sendTaskMessage(
107+
taskId,
108+
JSON.stringify({
109+
type: 'error',
110+
...structuredError(err as Error),
111+
})
112+
)
130113
}
131114
isRunning = false
132115
}
133116

134117
while (true) {
118+
const taskId = await binding.recvWorkerRequest(poolId)
119+
120+
await binding.notifyWorkerAck(taskId, workerId)
121+
135122
const msg_str = await binding.recvMessageInWorker(workerId)
136123

137124
const msg = JSON.parse(msg_str) as
@@ -148,10 +135,9 @@ export const run = async (
148135

149136
switch (msg.type) {
150137
case 'evaluate': {
151-
queue.push(msg.args)
152138
if (!isRunning) {
153139
isRunning = true
154-
run()
140+
run(taskId, msg.args)
155141
}
156142
break
157143
}

turbopack/crates/turbopack-node/js/src/worker_threads/evaluate.ts

Lines changed: 68 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -13,75 +13,45 @@ const binding: Binding = require(
1313
/* turbopackIgnore: true */ workerData.bindingPath
1414
)
1515

16-
const queue: string[][] = []
16+
let nextId = 1
17+
const requests = new Map()
1718

18-
export const run = async (
19-
moduleFactory: () => Promise<{
20-
init?: () => Promise<void>
21-
default: (channel: Channel<any, any>, ...deserializedArgs: any[]) => any
22-
}>
23-
) => {
24-
const taskId = await binding.recvWorkerRequest(workerData.poolId)
19+
class InternalIpc {
20+
constructor(private taskId: number) {}
2521

26-
await binding.notifyWorkerAck(taskId, workerId)
27-
28-
let nextId = 1
29-
const requests = new Map()
30-
const internalIpc = {
31-
sendInfo: (message: any) =>
32-
binding.sendTaskMessage(
33-
taskId,
34-
JSON.stringify({
35-
type: 'info',
36-
data: message,
37-
})
38-
),
39-
sendRequest: async (message: any) => {
40-
const id = nextId++
41-
let resolve, reject
42-
const promise = new Promise((res, rej) => {
43-
resolve = res
44-
reject = rej
22+
async sendInfo(message: any) {
23+
return await binding.sendTaskMessage(
24+
this.taskId,
25+
JSON.stringify({
26+
type: 'info',
27+
data: message,
4528
})
46-
requests.set(id, { resolve, reject })
47-
return binding
48-
.sendTaskMessage(
49-
taskId,
50-
JSON.stringify({ type: 'request', id, data: message })
51-
)
52-
.then(() => promise)
53-
},
54-
sendError: async (error: Error) => {
55-
try {
56-
await binding.sendTaskMessage(
57-
taskId,
58-
JSON.stringify({
59-
type: 'error',
60-
...structuredError(error),
61-
})
62-
)
63-
} catch (err) {
64-
// There's nothing we can do about errors that happen after this point, we can't tell anyone
65-
// about them.
66-
console.error('failed to send error back to rust:', err)
67-
}
68-
},
29+
)
6930
}
7031

71-
let getValue: (channel: Channel<any, any>, ...deserializedArgs: any[]) => any
72-
try {
73-
const module = await moduleFactory()
74-
if (typeof module.init === 'function') {
75-
await module.init()
76-
}
77-
getValue = module.default
78-
} catch (err) {
32+
async sendRequest(message: any) {
33+
const id = nextId++
34+
let resolve, reject
35+
const promise = new Promise((res, rej) => {
36+
resolve = res
37+
reject = rej
38+
})
39+
requests.set(id, { resolve, reject })
40+
return await binding
41+
.sendTaskMessage(
42+
this.taskId,
43+
JSON.stringify({ type: 'request', id, data: message })
44+
)
45+
.then(() => promise)
46+
}
47+
48+
async sendError(error: Error) {
7949
try {
8050
await binding.sendTaskMessage(
81-
taskId,
51+
this.taskId,
8252
JSON.stringify({
8353
type: 'error',
84-
...structuredError(err as Error),
54+
...structuredError(error),
8555
})
8656
)
8757
} catch (err) {
@@ -90,36 +60,52 @@ export const run = async (
9060
console.error('failed to send error back to rust:', err)
9161
}
9262
}
63+
}
64+
65+
export const run = async (
66+
moduleFactory: () => Promise<{
67+
init?: () => Promise<void>
68+
default: (channel: Channel<any, any>, ...deserializedArgs: any[]) => any
69+
}>
70+
) => {
71+
let getValue: (channel: Channel<any, any>, ...deserializedArgs: any[]) => any
9372

9473
let isRunning = false
9574

96-
const run = async () => {
97-
while (queue.length > 0) {
98-
const args = queue.shift()!
99-
try {
100-
const value = await getValue(internalIpc, ...args)
101-
await binding.sendTaskMessage(
102-
taskId,
103-
JSON.stringify({
104-
type: 'end',
105-
data: value === undefined ? undefined : JSON.stringify(value),
106-
duration: 0,
107-
})
108-
)
109-
} catch (err) {
110-
await binding.sendTaskMessage(
111-
taskId,
112-
JSON.stringify({
113-
type: 'error',
114-
...structuredError(err as Error),
115-
})
116-
)
75+
const run = async (taskId: number, args: string[]) => {
76+
try {
77+
if (typeof getValue !== 'function') {
78+
const module = await moduleFactory()
79+
if (typeof module.init === 'function') {
80+
await module.init()
81+
}
82+
getValue = module.default
11783
}
84+
const value = await getValue(new InternalIpc(taskId), ...args)
85+
await binding.sendTaskMessage(
86+
taskId,
87+
JSON.stringify({
88+
type: 'end',
89+
data: value === undefined ? undefined : JSON.stringify(value),
90+
duration: 0,
91+
})
92+
)
93+
} catch (err) {
94+
await binding.sendTaskMessage(
95+
taskId,
96+
JSON.stringify({
97+
type: 'error',
98+
...structuredError(err as Error),
99+
})
100+
)
118101
}
119102
isRunning = false
120103
}
121104

122105
while (true) {
106+
const taskId = await binding.recvWorkerRequest(workerData.poolId)
107+
108+
await binding.notifyWorkerAck(taskId, workerId)
123109
const msg_str = await binding.recvMessageInWorker(workerId)
124110

125111
const msg = JSON.parse(msg_str) as
@@ -136,10 +122,9 @@ export const run = async (
136122

137123
switch (msg.type) {
138124
case 'evaluate': {
139-
queue.push(msg.args)
140125
if (!isRunning) {
141126
isRunning = true
142-
run()
127+
run(taskId, msg.args)
143128
}
144129
break
145130
}

0 commit comments

Comments
 (0)