Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions turbopack/crates/turbopack-node/js/src/web_worker/evaluate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ let binding: Binding = self.workerData.binding

binding.workerCreated(workerId)

const TEXT_ENCODER = new TextEncoder()
const TEXT_DECODER = new TextDecoder()

export const run = async (
moduleFactory: () => Promise<{
init?: () => Promise<void>
Expand All @@ -41,19 +44,23 @@ export const run = async (
const value = await getValue(new TaskChannel(binding, taskId), ...args)
await binding.sendTaskMessage({
taskId,
data: JSON.stringify({
type: 'end',
data: value === undefined ? undefined : JSON.stringify(value),
duration: 0,
}),
data: TEXT_ENCODER.encode(
JSON.stringify({
type: 'end',
data: value === undefined ? undefined : JSON.stringify(value),
duration: 0,
})
),
})
} catch (err) {
await binding.sendTaskMessage({
taskId,
data: JSON.stringify({
type: 'error',
...structuredError(err as Error),
}),
data: TEXT_ENCODER.encode(
JSON.stringify({
type: 'error',
...structuredError(err as Error),
})
),
})
}
if (queue.length > 0) {
Expand All @@ -65,10 +72,10 @@ export const run = async (
}

while (true) {
const { taskId, data: msg_str } =
const { taskId, data: msg_buf } =
await binding.recvTaskMessageInWorker(workerId)

const msg = JSON.parse(msg_str) as
const msg = JSON.parse(TEXT_DECODER.decode(msg_buf)) as
| {
type: 'evaluate'
args: string[]
Expand Down
29 changes: 18 additions & 11 deletions turbopack/crates/turbopack-node/js/src/worker_thread/evaluate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ if (workerData.binding) {

binding.workerCreated(workerId)

const TEXT_ENCODER = new TextEncoder()
const TEXT_DECODER = new TextDecoder()

export const run = async (
moduleFactory: () => Promise<{
init?: () => Promise<void>
Expand All @@ -42,19 +45,23 @@ export const run = async (
const value = await getValue(new TaskChannel(binding, taskId), ...args)
await binding.sendTaskMessage({
taskId,
data: JSON.stringify({
type: 'end',
data: value === undefined ? undefined : JSON.stringify(value),
duration: 0,
}),
data: TEXT_ENCODER.encode(
JSON.stringify({
type: 'end',
data: value === undefined ? undefined : JSON.stringify(value),
duration: 0,
})
),
})
} catch (err) {
await binding.sendTaskMessage({
taskId,
data: JSON.stringify({
type: 'error',
...structuredError(err as Error),
}),
data: TEXT_ENCODER.encode(
JSON.stringify({
type: 'error',
...structuredError(err as Error),
})
),
})
}
if (queue.length > 0) {
Expand All @@ -66,10 +73,10 @@ export const run = async (
}

while (true) {
const { taskId, data: msg_str } =
const { taskId, data: msg_buf } =
await binding.recvTaskMessageInWorker(workerId)

const msg = JSON.parse(msg_str) as
const msg = JSON.parse(TEXT_DECODER.decode(msg_buf)) as
| {
type: 'evaluate'
args: string[]
Expand Down
26 changes: 16 additions & 10 deletions turbopack/crates/turbopack-node/js/src/worker_thread/taskChannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { structuredError } from '../error'

export interface TaskMessage {
taskId: number
data: string
data: Uint8Array
}

export interface Binding {
Expand All @@ -24,10 +24,12 @@ export class TaskChannel {
async sendInfo(message: any) {
return await this.binding.sendTaskMessage({
taskId: this.taskId,
data: JSON.stringify({
type: 'info',
data: message,
}),
data: Buffer.from(
JSON.stringify({
type: 'info',
data: message,
})
),
})
}

Expand All @@ -42,7 +44,9 @@ export class TaskChannel {
return await this.binding
.sendTaskMessage({
taskId: this.taskId,
data: JSON.stringify({ type: 'request', id, data: message }),
data: Buffer.from(
JSON.stringify({ type: 'request', id, data: message })
),
})
.then(() => promise)
}
Expand All @@ -51,10 +55,12 @@ export class TaskChannel {
try {
await this.binding.sendTaskMessage({
taskId: this.taskId,
data: JSON.stringify({
type: 'error',
...structuredError(error),
}),
data: Buffer.from(
JSON.stringify({
type: 'error',
...structuredError(error),
})
),
})
} catch (err) {
// There's nothing we can do about errors that happen after this point, we can't tell anyone
Expand Down
16 changes: 7 additions & 9 deletions turbopack/crates/turbopack-node/src/evaluate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ use turbo_tasks::{
TryJoinIterExt, Vc, duration_span, fxindexmap, get_effects, trace::TraceRawVcs,
};
use turbo_tasks_env::{EnvMap, ProcessEnv};
use turbo_tasks_fs::{
File, FileContent, FileSystemPath, json::parse_json_with_source_context, to_sys_path,
};
use turbo_tasks_fs::{File, FileContent, FileSystemPath, to_sys_path};
use turbopack_core::{
asset::AssetContent,
changed::content_changed,
Expand Down Expand Up @@ -102,9 +100,9 @@ pub trait EvaluateOperation: Send + Sync {

#[async_trait::async_trait]
pub trait Operation: Send {
async fn recv(&mut self) -> Result<String>;
async fn recv(&mut self) -> Result<Vec<u8>>;

async fn send(&mut self, data: String) -> Result<()>;
async fn send(&mut self, data: Vec<u8>) -> Result<()>;

async fn wait_or_kill(&mut self) -> Result<ExitStatus>;

Expand Down Expand Up @@ -374,7 +372,7 @@ pub async fn custom_evaluate(evaluate_context: impl EvaluateContext) -> Result<V
|| async {
let mut operation = pool.operation().await?;
operation
.send(serde_json::to_string(
.send(serde_json::to_vec(
&EvalJavaScriptOutgoingMessage::Evaluate {
args: args.iter().map(|v| &**v).collect(),
},
Expand Down Expand Up @@ -548,7 +546,7 @@ async fn pull_operation<T: EvaluateContext>(
let _guard = duration_span!("Node.js evaluation");

loop {
let message = parse_json_with_source_context(&operation.recv().await?)?;
let message = serde_json::from_slice(&operation.recv().await?)?;

match message {
EvalJavaScriptIncomingMessage::Error(error) => {
Expand All @@ -571,7 +569,7 @@ async fn pull_operation<T: EvaluateContext>(
{
Ok(response) => {
operation
.send(serde_json::to_string(
.send(serde_json::to_vec(
&EvalJavaScriptOutgoingMessage::Result {
id,
error: None,
Expand All @@ -582,7 +580,7 @@ async fn pull_operation<T: EvaluateContext>(
}
Err(e) => {
operation
.send(serde_json::to_string(
.send(serde_json::to_vec(
&EvalJavaScriptOutgoingMessage::Result {
id,
error: Some(PrettyPrintError(&e).to_string()),
Expand Down
8 changes: 4 additions & 4 deletions turbopack/crates/turbopack-node/src/process_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,18 +670,18 @@ pub struct ChildProcessOperation {

#[async_trait::async_trait]
impl Operation for ChildProcessOperation {
async fn recv(&mut self) -> Result<String> {
async fn recv(&mut self) -> Result<Vec<u8>> {
let vec = self
.with_process(|process| async move {
process.recv().await.context("failed to receive message")
})
.await?;
Ok(String::from_utf8(vec)?)
Ok(vec)
}

async fn send(&mut self, message: String) -> Result<()> {
async fn send(&mut self, message: Vec<u8>) -> Result<()> {
self.with_process(|process| async move {
timeout(Duration::from_secs(30), process.send(message.into_bytes()))
timeout(Duration::from_secs(30), process.send(message))
.await
.context("timeout while sending message")?
.context("failed to send message")?;
Expand Down
20 changes: 10 additions & 10 deletions turbopack/crates/turbopack-node/src/worker_pool/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,15 @@ pub struct WorkerOptions {

pub(super) struct TaskMessage {
pub task_id: u32,
pub data: String,
pub data: Vec<u8>,
}

#[derive(Default)]
pub(crate) struct WorkerPoolOperation {
#[allow(clippy::type_complexity)]
worker_routed_channel: Mutex<FxHashMap<u32, Arc<MessageChannel<(u32, String)>>>>,
worker_routed_channel: Mutex<FxHashMap<u32, Arc<MessageChannel<(u32, Vec<u8>)>>>>,
#[allow(clippy::type_complexity)]
task_routed_channel: Mutex<FxHashMap<u32, Arc<MessageChannel<String>>>>,
task_routed_channel: Mutex<FxHashMap<u32, Arc<MessageChannel<Vec<u8>>>>>,
pub(crate) pools: Mutex<FxHashMap<Arc<WorkerOptions>, Arc<PoolState>>>,
}

Expand Down Expand Up @@ -143,7 +143,7 @@ impl WorkerPoolOperation {
&self,
worker_id: u32,
task_id: u32,
message: String,
message: Vec<u8>,
) -> Result<()> {
let channel = {
let mut map = self.worker_routed_channel.lock();
Expand Down Expand Up @@ -176,7 +176,7 @@ impl WorkerPoolOperation {
self.worker_routed_channel.lock().remove(&worker_id);
}

pub async fn recv_task_message(&self, task_id: u32) -> Result<String> {
pub async fn recv_task_message(&self, task_id: u32) -> Result<Vec<u8>> {
let channel = {
let mut map = self.task_routed_channel.lock();
map.entry(task_id)
Expand All @@ -197,7 +197,7 @@ impl WorkerPoolOperation {
pub(crate) async fn recv_task_message_in_worker(
&self,
worker_id: u32,
) -> Result<(u32, String)> {
) -> Result<(u32, Vec<u8>)> {
let channel = {
let mut map = self.worker_routed_channel.lock();
map.entry(worker_id)
Expand Down Expand Up @@ -230,7 +230,7 @@ pub(crate) static WORKER_POOL_OPERATION: LazyLock<WorkerPoolOperation> =
pub(crate) async fn send_message_to_worker(
worker_id: u32,
task_id: u32,
message: String,
message: Vec<u8>,
) -> Result<()> {
WORKER_POOL_OPERATION
.send_message_to_worker(worker_id, task_id, message)
Expand All @@ -241,7 +241,7 @@ pub(crate) fn terminate_worker(worker_options: Arc<WorkerOptions>, worker_id: u3
WORKER_POOL_OPERATION.terminate_worker(worker_options, worker_id)
}

pub(crate) async fn recv_task_message(task_id: u32) -> Result<String> {
pub(crate) async fn recv_task_message(task_id: u32) -> Result<Vec<u8>> {
WORKER_POOL_OPERATION.recv_task_message(task_id).await
}

Expand Down Expand Up @@ -274,11 +274,11 @@ impl Drop for WorkerOperation {

#[async_trait::async_trait]
impl Operation for WorkerOperation {
async fn recv(&mut self) -> Result<String> {
async fn recv(&mut self) -> Result<Vec<u8>> {
recv_task_message(self.task_id).await
}

async fn send(&mut self, message: String) -> Result<()> {
async fn send(&mut self, message: Vec<u8>) -> Result<()> {
send_message_to_worker(self.worker_id, self.task_id, message).await
}

Expand Down
21 changes: 14 additions & 7 deletions turbopack/crates/turbopack-node/src/worker_pool/web_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,22 @@ pub fn terminate_worker(options: Arc<WorkerOptions>, worker_id: u32) {
pub struct WasmTaskMessage {
#[wasm_bindgen(js_name = "taskId")]
pub task_id: u32,
#[wasm_bindgen(getter_with_clone)]
pub data: String,
data: js_sys::Uint8Array,
}

#[wasm_bindgen]
impl WasmTaskMessage {
#[wasm_bindgen(getter)]
pub fn data(&self) -> js_sys::Uint8Array {
self.data.clone()
}
}

impl From<TaskMessage> for WasmTaskMessage {
fn from(msg: TaskMessage) -> Self {
Self {
task_id: msg.task_id,
data: msg.data,
data: js_sys::Uint8Array::from(&msg.data[..]),
}
}
}
Expand All @@ -150,10 +157,10 @@ pub async fn send_task_message(message: JsValue) -> Result<(), JsError> {
.as_f64()
.ok_or_else(|| JsError::new("taskId must be a number"))? as u32;

let data = js_sys::Reflect::get(&message, &"data".into())
.map_err(|_| JsError::new("Failed to get data"))?
.as_string()
.ok_or_else(|| JsError::new("data must be a string"))?;
let data_js = js_sys::Reflect::get(&message, &"data".into())
.map_err(|_| JsError::new("Failed to get data"))?;

let data = js_sys::Uint8Array::new(&data_js).to_vec();
Comment on lines +160 to +163

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using js_sys::Uint8Array::new(&data_js) creates a new Uint8Array and copies the data from data_js. This is less efficient than a direct cast and can lead to unexpected behavior if data_js is not an array-like object (e.g., if it's a number, it will create an empty buffer of that size).

A more direct, efficient, and type-safe approach is to cast the JsValue to js_sys::Uint8Array using dyn_into. This avoids an unnecessary copy and ensures that the incoming data is of the expected Uint8Array type.

Suggested change
let data_js = js_sys::Reflect::get(&message, &"data".into())
.map_err(|_| JsError::new("Failed to get data"))?;
let data = js_sys::Uint8Array::new(&data_js).to_vec();
let data = js_sys::Reflect::get(&message, &"data".into())
.map_err(|_| JsError::new("Failed to get data"))?
.dyn_into::<js_sys::Uint8Array>()
.map_err(|_| JsError::new("data must be a Uint8Array"))?
.to_vec();


WORKER_POOL_OPERATION
.send_task_message(TaskMessage { task_id, data })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,16 @@ pub struct NapiWorkerTermination {
#[allow(unused)]
pub struct NapiTaskMessage {
pub task_id: u32,
pub data: String,
pub data: napi::bindgen_prelude::Buffer,
}

impl From<NapiTaskMessage> for TaskMessage {
fn from(message: NapiTaskMessage) -> Self {
let NapiTaskMessage { task_id, data } = message;
TaskMessage { task_id, data }
TaskMessage {
task_id,
data: data.into(),
}
}
}

Expand All @@ -163,7 +166,7 @@ pub async fn recv_task_message_in_worker(worker_id: u32) -> napi::Result<NapiTas
.await?;
Ok(NapiTaskMessage {
task_id,
data: message,
data: message.into(),
})
}

Expand Down
Loading