From 6aafecbac8348e4d09fb0d7724ecc0dabb4d8d62 Mon Sep 17 00:00:00 2001 From: Aevyrie Roessler Date: Sun, 28 Dec 2025 20:07:54 -0800 Subject: [PATCH 01/21] Channel-based mesh collection and prep. --- crates/bevy_pbr/Cargo.toml | 24 +-- crates/bevy_pbr/src/render/mesh.rs | 278 +++++++++++++++++++---------- 2 files changed, 197 insertions(+), 105 deletions(-) diff --git a/crates/bevy_pbr/Cargo.toml b/crates/bevy_pbr/Cargo.toml index 6e9efc0faf49b..0d7d4dcaaaa91 100644 --- a/crates/bevy_pbr/Cargo.toml +++ b/crates/bevy_pbr/Cargo.toml @@ -22,14 +22,14 @@ bluenoise_texture = ["bevy_image/ktx2", "bevy_image/zstd"] shader_format_glsl = ["bevy_shader/shader_format_glsl"] trace = ["bevy_render/trace"] # Enables the meshlet renderer for dense high-poly scenes (experimental) -meshlet = ["dep:lz4_flex", "dep:range-alloc", "dep:bevy_tasks"] +meshlet = ["dep:lz4_flex", "dep:range-alloc"] # Enables processing meshes into meshlet meshes meshlet_processor = [ - "meshlet", - "dep:meshopt", - "dep:metis", - "dep:itertools", - "dep:bitvec", + "meshlet", + "dep:meshopt", + "dep:metis", + "dep:itertools", + "dep:bitvec", ] [dependencies] @@ -44,21 +44,21 @@ bevy_ecs = { path = "../bevy_ecs", version = "0.18.0-dev" } bevy_light = { path = "../bevy_light", version = "0.18.0-dev" } bevy_image = { path = "../bevy_image", version = "0.18.0-dev" } bevy_mesh = { path = "../bevy_mesh", version = "0.18.0-dev", features = [ - "morph", - "bevy_mikktspace", + "morph", + "bevy_mikktspace", ] } bevy_shader = { path = "../bevy_shader", version = "0.18.0-dev" } bevy_math = { path = "../bevy_math", version = "0.18.0-dev" } bevy_reflect = { path = "../bevy_reflect", version = "0.18.0-dev" } bevy_render = { path = "../bevy_render", version = "0.18.0-dev", features = [ - "morph", + "morph", ] } bevy_camera = { path = "../bevy_camera", version = "0.18.0-dev" } -bevy_tasks = { path = "../bevy_tasks", version = "0.18.0-dev", optional = true } +bevy_tasks = { path = "../bevy_tasks", version = "0.18.0-dev" } bevy_transform = { path = "../bevy_transform", version = "0.18.0-dev" } bevy_utils = { path = "../bevy_utils", version = "0.18.0-dev" } bevy_platform = { path = "../bevy_platform", version = "0.18.0-dev", default-features = false, features = [ - "std", + "std", ] } # other @@ -68,7 +68,7 @@ thiserror = { version = "2", default-features = false } derive_more = { version = "2", default-features = false, features = ["from"] } # meshlet lz4_flex = { version = "0.12", default-features = false, features = [ - "frame", + "frame", ], optional = true } range-alloc = { version = "0.1", optional = true } meshopt = { version = "0.6.2", optional = true } diff --git a/crates/bevy_pbr/src/render/mesh.rs b/crates/bevy_pbr/src/render/mesh.rs index e0243899f6b8d..2487fc79e6dcb 100644 --- a/crates/bevy_pbr/src/render/mesh.rs +++ b/crates/bevy_pbr/src/render/mesh.rs @@ -65,7 +65,7 @@ use bevy_utils::{default, Parallel, TypeIdMap}; use core::any::TypeId; use core::mem::size_of; use material_bind_groups::MaterialBindingId; -use tracing::{error, warn}; +use tracing::{error, info_span, warn}; use self::irradiance_volume::IRRADIANCE_VOLUMES_ARE_USABLE; use crate::{ @@ -88,6 +88,7 @@ use bevy_render::prelude::Msaa; use bevy_render::sync_world::{MainEntity, MainEntityHashMap}; use bevy_render::view::ExtractedView; use bevy_render::RenderSystems::PrepareAssets; +use bevy_tasks::ComputeTaskPool; use bytemuck::{Pod, Zeroable}; use nonmax::{NonMaxU16, NonMaxU32}; @@ -1123,22 +1124,19 @@ impl RenderMeshInstanceGpuQueue { } impl RenderMeshInstanceGpuBuilder { - /// Flushes this mesh instance to the [`RenderMeshInstanceGpu`] and - /// [`MeshInputUniform`] tables, replacing the existing entry if applicable. - fn update( - mut self, + /// Prepares the data needed to update the mesh instance. + /// + /// This is the thread-safe part of the update. + fn prepare( + self, entity: MainEntity, - render_mesh_instances: &mut MainEntityHashMap, - current_input_buffer: &mut InstanceInputUniformBuffer, - previous_input_buffer: &mut InstanceInputUniformBuffer, mesh_allocator: &MeshAllocator, mesh_material_ids: &RenderMaterialInstances, render_material_bindings: &RenderMaterialBindings, render_lightmaps: &RenderLightmaps, skin_uniforms: &SkinUniforms, timestamp: FrameCount, - meshes_to_reextract_next_frame: &mut MeshesToReextractNextFrame, - ) -> Option { + ) -> RenderMeshInstanceGpuPrepared { let (first_vertex_index, vertex_count) = match mesh_allocator.mesh_vertex_slice(&self.shared.mesh_asset_id) { Some(mesh_vertex_slice) => ( @@ -1168,17 +1166,13 @@ impl RenderMeshInstanceGpuBuilder { let mesh_material = mesh_material_ids.mesh_material(entity); let mesh_material_binding_id = if mesh_material != DUMMY_MESH_MATERIAL.untyped() { match render_material_bindings.get(&mesh_material) { - Some(binding_id) => *binding_id, - None => { - meshes_to_reextract_next_frame.insert(entity); - return None; - } + Some(binding_id) => Some(*binding_id), + None => None, } } else { // Use a dummy material binding ID. - MaterialBindingId::default() + Some(MaterialBindingId::default()) }; - self.shared.material_bindings_index = mesh_material_binding_id; let lightmap_slot = match render_lightmaps.render_lightmaps.get(&entity) { Some(render_lightmap) => u16::from(*render_lightmap.slot_index), @@ -1188,10 +1182,14 @@ impl RenderMeshInstanceGpuBuilder { .render_lightmaps .get(&entity) .map(|lightmap| lightmap.slab_index); - self.shared.lightmap_slab_index = lightmap_slab_index; + + let mut shared = self.shared; + shared.lightmap_slab_index = lightmap_slab_index; + + let material_bindings_index = mesh_material_binding_id.unwrap_or_default(); // Create the mesh input uniform. - let mut mesh_input_uniform = MeshInputUniform { + let mesh_input_uniform = MeshInputUniform { world_from_local: self.world_from_local.to_transpose(), lightmap_uv_rect: self.lightmap_uv_rect, flags: self.mesh_flags.bits(), @@ -1205,19 +1203,50 @@ impl RenderMeshInstanceGpuBuilder { vertex_count }, current_skin_index, - material_and_lightmap_bind_group_slot: u32::from( - self.shared.material_bindings_index.slot, - ) | ((lightmap_slot as u32) << 16), - tag: self.shared.tag, + material_and_lightmap_bind_group_slot: u32::from(material_bindings_index.slot) + | ((lightmap_slot as u32) << 16), + tag: shared.tag, pad: 0, }; - // Did the last frame contain this entity as well? - let current_uniform_index; let world_from_local = &self.world_from_local; let center = - world_from_local.matrix3.mul_vec3(self.shared.center) + world_from_local.translation; + world_from_local.matrix3.mul_vec3(shared.center) + world_from_local.translation; + + RenderMeshInstanceGpuPrepared { + shared, + mesh_input_uniform, + center, + material_ready: mesh_material_binding_id.is_some(), + } + } +} + +pub struct RenderMeshInstanceGpuPrepared { + shared: RenderMeshInstanceShared, + mesh_input_uniform: MeshInputUniform, + center: Vec3, + material_ready: bool, +} +impl RenderMeshInstanceGpuPrepared { + /// Flushes this mesh instance to the [`RenderMeshInstanceGpu`] and + /// [`MeshInputUniform`] tables, replacing the existing entry if applicable. + fn update( + mut self, + entity: MainEntity, + render_mesh_instances: &mut MainEntityHashMap, + current_input_buffer: &mut InstanceInputUniformBuffer, + previous_input_buffer: &mut InstanceInputUniformBuffer, + meshes_to_reextract_next_frame: &mut MeshesToReextractNextFrame, + ) -> Option { + if !self.material_ready { + meshes_to_reextract_next_frame.insert(entity); + return None; + } + + // Did the last frame contain this entity as well? + let current_uniform_index; match render_mesh_instances.entry(entity) { Entry::Occupied(mut occupied_entry) => { // Yes, it did. Replace its entry with the new one. @@ -1230,15 +1259,15 @@ impl RenderMeshInstanceGpuBuilder { let previous_mesh_input_uniform = current_input_buffer.get_unchecked(current_uniform_index); let previous_input_index = previous_input_buffer.add(previous_mesh_input_uniform); - mesh_input_uniform.previous_input_index = previous_input_index; + self.mesh_input_uniform.previous_input_index = previous_input_index; // Write in the new mesh input uniform. - current_input_buffer.set(current_uniform_index, mesh_input_uniform); + current_input_buffer.set(current_uniform_index, self.mesh_input_uniform); occupied_entry.replace_entry_with(|_, _| { Some(RenderMeshInstanceGpu { shared: self.shared, - center, + center: self.center, current_uniform_index: NonMaxU32::new(current_uniform_index) .unwrap_or_default(), }) @@ -1247,11 +1276,11 @@ impl RenderMeshInstanceGpuBuilder { Entry::Vacant(vacant_entry) => { // No, this is a new entity. Push its data on to the buffer. - current_uniform_index = current_input_buffer.add(mesh_input_uniform); + current_uniform_index = current_input_buffer.add(self.mesh_input_uniform); vacant_entry.insert(RenderMeshInstanceGpu { shared: self.shared, - center, + center: self.center, current_uniform_index: NonMaxU32::new(current_uniform_index) .unwrap_or_default(), }); @@ -1768,80 +1797,143 @@ pub fn collect_meshes_for_gpu_building( previous_input_buffer.clear(); + let (cpu_tx, cpu_rx) = + std::sync::mpsc::channel::>(); + let (gpu_tx, gpu_rx) = std::sync::mpsc::channel::< + Vec<(MainEntity, RenderMeshInstanceGpuPrepared, MeshCullingData)>, + >(); + let (removed_tx, removed_rx) = std::sync::mpsc::channel::>(); + + const CHUNK_SIZE: usize = 4096; + // Build the [`RenderMeshInstance`]s and [`MeshInputUniform`]s. + ComputeTaskPool::get().scope(|scope| { + let cpu_culling_changed = cpu_tx; + let gpu_culling_changed = gpu_tx; + let mesh_allocator = &mesh_allocator; + let mesh_material_ids = &mesh_material_ids; + let render_material_bindings = &render_material_bindings; + let render_lightmaps = &render_lightmaps; + let skin_uniforms = &skin_uniforms; + let frame_count = *frame_count; + + scope.spawn(async move { + let _g = info_span!("update_mesh_instances").entered(); + + for (entity, prepared) in cpu_rx.iter().flatten() { + prepared.update( + entity, + &mut *render_mesh_instances, + current_input_buffer, + previous_input_buffer, + &mut meshes_to_reextract_next_frame, + ); + } - for queue in render_mesh_instance_queues.iter_mut() { - match *queue { - RenderMeshInstanceGpuQueue::None => { - // This can only happen if the queue is empty. + for (entity, prepared, mesh_culling_builder) in gpu_rx.iter().flatten() { + let Some(instance_data_index) = prepared.update( + entity, + &mut *render_mesh_instances, + current_input_buffer, + previous_input_buffer, + &mut meshes_to_reextract_next_frame, + ) else { + continue; + }; + mesh_culling_builder + .update(&mut mesh_culling_data_buffer, instance_data_index as usize); } - RenderMeshInstanceGpuQueue::CpuCulling { - ref mut changed, - ref mut removed, - } => { - for (entity, mesh_instance_builder) in changed.drain(..) { - mesh_instance_builder.update( - entity, - &mut *render_mesh_instances, - current_input_buffer, - previous_input_buffer, - &mesh_allocator, - &mesh_material_ids, - &render_material_bindings, - &render_lightmaps, - &skin_uniforms, - *frame_count, - &mut meshes_to_reextract_next_frame, - ); - } + for entity in removed_rx.iter().flatten() { + remove_mesh_input_uniform( + entity, + &mut *render_mesh_instances, + current_input_buffer, + ); + } - for entity in removed.drain(..) { - remove_mesh_input_uniform( - entity, - &mut *render_mesh_instances, - current_input_buffer, - ); + // Buffers can't be empty. Make sure there's something in the previous input buffer. + previous_input_buffer.ensure_nonempty(); + }); + + for queue in render_mesh_instance_queues.iter_mut() { + match *queue { + RenderMeshInstanceGpuQueue::None => { + // This can only happen if the queue is empty. } - } - RenderMeshInstanceGpuQueue::GpuCulling { - ref mut changed, - ref mut removed, - } => { - for (entity, mesh_instance_builder, mesh_culling_builder) in changed.drain(..) { - let Some(instance_data_index) = mesh_instance_builder.update( - entity, - &mut *render_mesh_instances, - current_input_buffer, - previous_input_buffer, - &mesh_allocator, - &mesh_material_ids, - &render_material_bindings, - &render_lightmaps, - &skin_uniforms, - *frame_count, - &mut meshes_to_reextract_next_frame, - ) else { - continue; - }; - mesh_culling_builder - .update(&mut mesh_culling_data_buffer, instance_data_index as usize); + RenderMeshInstanceGpuQueue::CpuCulling { + ref mut changed, + ref mut removed, + } => { + let _g = info_span!("scope_outer").entered(); + let cpu_culling = cpu_culling_changed.clone(); + if !removed.is_empty() { + removed_tx.send(std::mem::take(removed)).ok(); + } + scope.spawn(async move { + let _g = info_span!("par_cpu_culling_scope").entered(); + while !changed.is_empty() { + let chunk = changed + .drain(..CHUNK_SIZE.min(changed.len())) + .map(|(entity, mesh_instance_builder)| { + ( + entity, + mesh_instance_builder.prepare( + entity, + mesh_allocator, + mesh_material_ids, + render_material_bindings, + render_lightmaps, + skin_uniforms, + frame_count, + ), + ) + }) + .collect(); + cpu_culling.send(chunk).ok(); + } + }); } - for entity in removed.drain(..) { - remove_mesh_input_uniform( - entity, - &mut *render_mesh_instances, - current_input_buffer, - ); + RenderMeshInstanceGpuQueue::GpuCulling { + ref mut changed, + ref mut removed, + } => { + let _g = info_span!("scope_outer").entered(); + let gpu_culling = gpu_culling_changed.clone(); + if !removed.is_empty() { + removed_tx.send(std::mem::take(removed)).ok(); + } + scope.spawn(async move { + let _g = info_span!("par_gpu_culling_scope").entered(); + while !changed.is_empty() { + let chunk = changed + .drain(..CHUNK_SIZE.min(changed.len())) + .map(|(entity, mesh_instance_builder, mesh_culling_builder)| { + ( + entity, + mesh_instance_builder.prepare( + entity, + mesh_allocator, + mesh_material_ids, + render_material_bindings, + render_lightmaps, + skin_uniforms, + frame_count, + ), + mesh_culling_builder, + ) + }) + .collect(); + gpu_culling.send(chunk).ok(); + } + }); } } } - } - - // Buffers can't be empty. Make sure there's something in the previous input buffer. - previous_input_buffer.ensure_nonempty(); + drop(removed_tx); + }); } /// All data needed to construct a pipeline for rendering 3D meshes. From 1ba8491c389763bf9de71c1467e89596e328f411 Mon Sep 17 00:00:00 2001 From: Aevyrie Roessler Date: Sun, 28 Dec 2025 20:52:38 -0800 Subject: [PATCH 02/21] Tweak chunk size --- crates/bevy_pbr/src/render/mesh.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_pbr/src/render/mesh.rs b/crates/bevy_pbr/src/render/mesh.rs index 2487fc79e6dcb..893713441e2ec 100644 --- a/crates/bevy_pbr/src/render/mesh.rs +++ b/crates/bevy_pbr/src/render/mesh.rs @@ -1804,7 +1804,7 @@ pub fn collect_meshes_for_gpu_building( >(); let (removed_tx, removed_rx) = std::sync::mpsc::channel::>(); - const CHUNK_SIZE: usize = 4096; + const CHUNK_SIZE: usize = 10_000; // Build the [`RenderMeshInstance`]s and [`MeshInputUniform`]s. ComputeTaskPool::get().scope(|scope| { From 3eba77c075b783f7f10707311bd6abcb2416306f Mon Sep 17 00:00:00 2001 From: Aevyrie Roessler Date: Sun, 28 Dec 2025 23:24:34 -0800 Subject: [PATCH 03/21] Revert toml formatter changes --- crates/bevy_pbr/Cargo.toml | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/crates/bevy_pbr/Cargo.toml b/crates/bevy_pbr/Cargo.toml index 0d7d4dcaaaa91..642bec811bfd3 100644 --- a/crates/bevy_pbr/Cargo.toml +++ b/crates/bevy_pbr/Cargo.toml @@ -25,11 +25,11 @@ trace = ["bevy_render/trace"] meshlet = ["dep:lz4_flex", "dep:range-alloc"] # Enables processing meshes into meshlet meshes meshlet_processor = [ - "meshlet", - "dep:meshopt", - "dep:metis", - "dep:itertools", - "dep:bitvec", + "meshlet", + "dep:meshopt", + "dep:metis", + "dep:itertools", + "dep:bitvec", ] [dependencies] @@ -44,21 +44,21 @@ bevy_ecs = { path = "../bevy_ecs", version = "0.18.0-dev" } bevy_light = { path = "../bevy_light", version = "0.18.0-dev" } bevy_image = { path = "../bevy_image", version = "0.18.0-dev" } bevy_mesh = { path = "../bevy_mesh", version = "0.18.0-dev", features = [ - "morph", - "bevy_mikktspace", + "morph", + "bevy_mikktspace", ] } bevy_shader = { path = "../bevy_shader", version = "0.18.0-dev" } bevy_math = { path = "../bevy_math", version = "0.18.0-dev" } bevy_reflect = { path = "../bevy_reflect", version = "0.18.0-dev" } bevy_render = { path = "../bevy_render", version = "0.18.0-dev", features = [ - "morph", + "morph", ] } bevy_camera = { path = "../bevy_camera", version = "0.18.0-dev" } bevy_tasks = { path = "../bevy_tasks", version = "0.18.0-dev" } bevy_transform = { path = "../bevy_transform", version = "0.18.0-dev" } bevy_utils = { path = "../bevy_utils", version = "0.18.0-dev" } bevy_platform = { path = "../bevy_platform", version = "0.18.0-dev", default-features = false, features = [ - "std", + "std", ] } # other @@ -68,7 +68,7 @@ thiserror = { version = "2", default-features = false } derive_more = { version = "2", default-features = false, features = ["from"] } # meshlet lz4_flex = { version = "0.12", default-features = false, features = [ - "frame", + "frame", ], optional = true } range-alloc = { version = "0.1", optional = true } meshopt = { version = "0.6.2", optional = true } From a2cfded58d9bd6cc5b70c4e9df1616b28d383a61 Mon Sep 17 00:00:00 2001 From: Aevyrie Roessler Date: Mon, 29 Dec 2025 01:35:38 -0800 Subject: [PATCH 04/21] Fix bug in mesh material indexing --- crates/bevy_pbr/src/render/mesh.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/bevy_pbr/src/render/mesh.rs b/crates/bevy_pbr/src/render/mesh.rs index 893713441e2ec..e9f933ce50f0a 100644 --- a/crates/bevy_pbr/src/render/mesh.rs +++ b/crates/bevy_pbr/src/render/mesh.rs @@ -1185,8 +1185,9 @@ impl RenderMeshInstanceGpuBuilder { let mut shared = self.shared; shared.lightmap_slab_index = lightmap_slab_index; + shared.material_bindings_index = mesh_material_binding_id.unwrap_or_default(); - let material_bindings_index = mesh_material_binding_id.unwrap_or_default(); + let material_bindings_index = shared.material_bindings_index; // Create the mesh input uniform. let mesh_input_uniform = MeshInputUniform { From cd40ac6fbceafa97b4d25e2f9c12762d4bd1c210 Mon Sep 17 00:00:00 2001 From: Aevyrie Roessler Date: Mon, 29 Dec 2025 13:15:17 -0800 Subject: [PATCH 05/21] fix clippy lints --- crates/bevy_pbr/src/render/mesh.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/crates/bevy_pbr/src/render/mesh.rs b/crates/bevy_pbr/src/render/mesh.rs index e9f933ce50f0a..6b3cf4ff70016 100644 --- a/crates/bevy_pbr/src/render/mesh.rs +++ b/crates/bevy_pbr/src/render/mesh.rs @@ -1165,10 +1165,9 @@ impl RenderMeshInstanceGpuBuilder { // `meshes_to_reextract_next_frame` and bail. let mesh_material = mesh_material_ids.mesh_material(entity); let mesh_material_binding_id = if mesh_material != DUMMY_MESH_MATERIAL.untyped() { - match render_material_bindings.get(&mesh_material) { - Some(binding_id) => Some(*binding_id), - None => None, - } + render_material_bindings + .get(&mesh_material) + .map(|binding_id| *binding_id) } else { // Use a dummy material binding ID. Some(MaterialBindingId::default()) @@ -1870,7 +1869,7 @@ pub fn collect_meshes_for_gpu_building( let _g = info_span!("scope_outer").entered(); let cpu_culling = cpu_culling_changed.clone(); if !removed.is_empty() { - removed_tx.send(std::mem::take(removed)).ok(); + removed_tx.send(core::mem::take(removed)).ok(); } scope.spawn(async move { let _g = info_span!("par_cpu_culling_scope").entered(); @@ -1904,7 +1903,7 @@ pub fn collect_meshes_for_gpu_building( let _g = info_span!("scope_outer").entered(); let gpu_culling = gpu_culling_changed.clone(); if !removed.is_empty() { - removed_tx.send(std::mem::take(removed)).ok(); + removed_tx.send(core::mem::take(removed)).ok(); } scope.spawn(async move { let _g = info_span!("par_gpu_culling_scope").entered(); From d360eb964c54f3127c7f14630d56914aa171a0a1 Mon Sep 17 00:00:00 2001 From: Aevyrie Roessler Date: Mon, 29 Dec 2025 13:15:44 -0800 Subject: [PATCH 06/21] more lints --- crates/bevy_pbr/src/render/mesh.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/bevy_pbr/src/render/mesh.rs b/crates/bevy_pbr/src/render/mesh.rs index 6b3cf4ff70016..f582c3b8062bb 100644 --- a/crates/bevy_pbr/src/render/mesh.rs +++ b/crates/bevy_pbr/src/render/mesh.rs @@ -1165,9 +1165,7 @@ impl RenderMeshInstanceGpuBuilder { // `meshes_to_reextract_next_frame` and bail. let mesh_material = mesh_material_ids.mesh_material(entity); let mesh_material_binding_id = if mesh_material != DUMMY_MESH_MATERIAL.untyped() { - render_material_bindings - .get(&mesh_material) - .map(|binding_id| *binding_id) + render_material_bindings.get(&mesh_material).copied() } else { // Use a dummy material binding ID. Some(MaterialBindingId::default()) From 8e4780fa064f665f6dfcea3feda09aa93cd85dbc Mon Sep 17 00:00:00 2001 From: Aevyrie Roessler Date: Mon, 29 Dec 2025 13:25:41 -0800 Subject: [PATCH 07/21] reduce diff --- crates/bevy_pbr/src/render/mesh.rs | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/crates/bevy_pbr/src/render/mesh.rs b/crates/bevy_pbr/src/render/mesh.rs index f582c3b8062bb..a06848ab343b7 100644 --- a/crates/bevy_pbr/src/render/mesh.rs +++ b/crates/bevy_pbr/src/render/mesh.rs @@ -1128,7 +1128,7 @@ impl RenderMeshInstanceGpuBuilder { /// /// This is the thread-safe part of the update. fn prepare( - self, + mut self, entity: MainEntity, mesh_allocator: &MeshAllocator, mesh_material_ids: &RenderMaterialInstances, @@ -1171,6 +1171,8 @@ impl RenderMeshInstanceGpuBuilder { Some(MaterialBindingId::default()) }; + self.shared.material_bindings_index = mesh_material_binding_id.unwrap_or_default(); + let lightmap_slot = match render_lightmaps.render_lightmaps.get(&entity) { Some(render_lightmap) => u16::from(*render_lightmap.slot_index), None => u16::MAX, @@ -1179,12 +1181,7 @@ impl RenderMeshInstanceGpuBuilder { .render_lightmaps .get(&entity) .map(|lightmap| lightmap.slab_index); - - let mut shared = self.shared; - shared.lightmap_slab_index = lightmap_slab_index; - shared.material_bindings_index = mesh_material_binding_id.unwrap_or_default(); - - let material_bindings_index = shared.material_bindings_index; + self.shared.lightmap_slab_index = lightmap_slab_index; // Create the mesh input uniform. let mesh_input_uniform = MeshInputUniform { @@ -1201,18 +1198,19 @@ impl RenderMeshInstanceGpuBuilder { vertex_count }, current_skin_index, - material_and_lightmap_bind_group_slot: u32::from(material_bindings_index.slot) - | ((lightmap_slot as u32) << 16), - tag: shared.tag, + material_and_lightmap_bind_group_slot: u32::from( + self.shared.material_bindings_index.slot, + ) | ((lightmap_slot as u32) << 16), + tag: self.shared.tag, pad: 0, }; let world_from_local = &self.world_from_local; let center = - world_from_local.matrix3.mul_vec3(shared.center) + world_from_local.translation; + world_from_local.matrix3.mul_vec3(self.shared.center) + world_from_local.translation; RenderMeshInstanceGpuPrepared { - shared, + shared: self.shared, mesh_input_uniform, center, material_ready: mesh_material_binding_id.is_some(), From 860d3cf75ac66e4ca4539071e077ae4a2d23c042 Mon Sep 17 00:00:00 2001 From: Aevyrie Roessler Date: Mon, 29 Dec 2025 16:17:59 -0800 Subject: [PATCH 08/21] better handle mesh re-extraction, fix single threaded deadlock, improve docs --- crates/bevy_pbr/src/render/mesh.rs | 319 +++++++++++++++++------------ 1 file changed, 190 insertions(+), 129 deletions(-) diff --git a/crates/bevy_pbr/src/render/mesh.rs b/crates/bevy_pbr/src/render/mesh.rs index a06848ab343b7..292435574b5c8 100644 --- a/crates/bevy_pbr/src/render/mesh.rs +++ b/crates/bevy_pbr/src/render/mesh.rs @@ -1136,7 +1136,21 @@ impl RenderMeshInstanceGpuBuilder { render_lightmaps: &RenderLightmaps, skin_uniforms: &SkinUniforms, timestamp: FrameCount, - ) -> RenderMeshInstanceGpuPrepared { + ) -> Option { + // Look up the material index. If we couldn't fetch the material index, + // then the material hasn't been prepared yet, perhaps because it hasn't + // yet loaded. In that case, we return None so that + // `collect_meshes_for_gpu_building` will add the mesh to + // `meshes_to_reextract_next_frame` and bail. + let mesh_material = mesh_material_ids.mesh_material(entity); + let mesh_material_binding_id = if mesh_material != DUMMY_MESH_MATERIAL.untyped() { + render_material_bindings.get(&mesh_material).copied()? + } else { + // Use a dummy material binding ID. + MaterialBindingId::default() + }; + self.shared.material_bindings_index = mesh_material_binding_id; + let (first_vertex_index, vertex_count) = match mesh_allocator.mesh_vertex_slice(&self.shared.mesh_asset_id) { Some(mesh_vertex_slice) => ( @@ -1159,20 +1173,6 @@ impl RenderMeshInstanceGpuBuilder { None => u32::MAX, }; - // Look up the material index. If we couldn't fetch the material index, - // then the material hasn't been prepared yet, perhaps because it hasn't - // yet loaded. In that case, add the mesh to - // `meshes_to_reextract_next_frame` and bail. - let mesh_material = mesh_material_ids.mesh_material(entity); - let mesh_material_binding_id = if mesh_material != DUMMY_MESH_MATERIAL.untyped() { - render_material_bindings.get(&mesh_material).copied() - } else { - // Use a dummy material binding ID. - Some(MaterialBindingId::default()) - }; - - self.shared.material_bindings_index = mesh_material_binding_id.unwrap_or_default(); - let lightmap_slot = match render_lightmaps.render_lightmaps.get(&entity) { Some(render_lightmap) => u16::from(*render_lightmap.slot_index), None => u16::MAX, @@ -1209,12 +1209,11 @@ impl RenderMeshInstanceGpuBuilder { let center = world_from_local.matrix3.mul_vec3(self.shared.center) + world_from_local.translation; - RenderMeshInstanceGpuPrepared { + Some(RenderMeshInstanceGpuPrepared { shared: self.shared, mesh_input_uniform, center, - material_ready: mesh_material_binding_id.is_some(), - } + }) } } @@ -1222,7 +1221,6 @@ pub struct RenderMeshInstanceGpuPrepared { shared: RenderMeshInstanceShared, mesh_input_uniform: MeshInputUniform, center: Vec3, - material_ready: bool, } impl RenderMeshInstanceGpuPrepared { @@ -1234,13 +1232,7 @@ impl RenderMeshInstanceGpuPrepared { render_mesh_instances: &mut MainEntityHashMap, current_input_buffer: &mut InstanceInputUniformBuffer, previous_input_buffer: &mut InstanceInputUniformBuffer, - meshes_to_reextract_next_frame: &mut MeshesToReextractNextFrame, ) -> Option { - if !self.material_ready { - meshes_to_reextract_next_frame.insert(entity); - return None; - } - // Did the last frame contain this entity as well? let current_uniform_index; match render_mesh_instances.entry(entity) { @@ -1774,6 +1766,16 @@ pub fn collect_meshes_for_gpu_building( skin_uniforms: Res, frame_count: Res, mut meshes_to_reextract_next_frame: ResMut, + prepared_chunk: Local< + Parallel< + Vec<( + MainEntity, + RenderMeshInstanceGpuPrepared, + Option, + )>, + >, + >, + reextract_chunk: Local>>, ) { let RenderMeshInstances::GpuBuilding(render_mesh_instances) = render_mesh_instances.into_inner() @@ -1793,65 +1795,74 @@ pub fn collect_meshes_for_gpu_building( previous_input_buffer.clear(); - let (cpu_tx, cpu_rx) = - std::sync::mpsc::channel::>(); - let (gpu_tx, gpu_rx) = std::sync::mpsc::channel::< - Vec<(MainEntity, RenderMeshInstanceGpuPrepared, MeshCullingData)>, - >(); - let (removed_tx, removed_rx) = std::sync::mpsc::channel::>(); - + /// The size of batches of data sent through the channel from parallel workers to the consumer. + /// This was tuned based on benchmarks across a wide range of entity counts. const CHUNK_SIZE: usize = 10_000; - // Build the [`RenderMeshInstance`]s and [`MeshInputUniform`]s. - ComputeTaskPool::get().scope(|scope| { - let cpu_culling_changed = cpu_tx; - let gpu_culling_changed = gpu_tx; - let mesh_allocator = &mesh_allocator; - let mesh_material_ids = &mesh_material_ids; - let render_material_bindings = &render_material_bindings; - let render_lightmaps = &render_lightmaps; - let skin_uniforms = &skin_uniforms; - let frame_count = *frame_count; - - scope.spawn(async move { - let _g = info_span!("update_mesh_instances").entered(); - - for (entity, prepared) in cpu_rx.iter().flatten() { - prepared.update( - entity, - &mut *render_mesh_instances, - current_input_buffer, - previous_input_buffer, - &mut meshes_to_reextract_next_frame, - ); - } - - for (entity, prepared, mesh_culling_builder) in gpu_rx.iter().flatten() { - let Some(instance_data_index) = prepared.update( - entity, - &mut *render_mesh_instances, - current_input_buffer, - previous_input_buffer, - &mut meshes_to_reextract_next_frame, - ) else { - continue; - }; - mesh_culling_builder + // Channels used by parallel workers to send data to the single consumer. + let (prepared_tx, prepared_rx) = std::sync::mpsc::channel::< + Vec<( + MainEntity, + RenderMeshInstanceGpuPrepared, + Option, + )>, + >(); + let (removed_tx, removed_rx) = std::sync::mpsc::channel::>(); + let (reextract_tx, reextract_rx) = std::sync::mpsc::channel::>(); + + // Reference data shared between tasks + let prepared_chunk = &prepared_chunk; + let reextract_chunk = &reextract_chunk; + let mesh_allocator = &mesh_allocator; + let mesh_material_ids = &mesh_material_ids; + let render_material_bindings = &render_material_bindings; + let render_lightmaps = &render_lightmaps; + let skin_uniforms = &skin_uniforms; + let frame_count = *frame_count; + + // A single worker that consumes the meshes prepared in parallel by multiple producers. + let mesh_consumer_worker = &mut Some(move || { + let _span = info_span!("prepared_mesh_consumer").entered(); + for (entity, prepared, mesh_culling_builder) in prepared_rx.iter().flatten() { + let Some(instance_data_index) = prepared.update( + entity, + &mut *render_mesh_instances, + current_input_buffer, + previous_input_buffer, + ) else { + continue; + }; + if let Some(mesh_culling_data) = mesh_culling_builder { + mesh_culling_data .update(&mut mesh_culling_data_buffer, instance_data_index as usize); } + } + for entity in removed_rx.iter().flatten() { + remove_mesh_input_uniform(entity, &mut *render_mesh_instances, current_input_buffer); + } + for entity in reextract_rx.iter().flatten() { + meshes_to_reextract_next_frame.insert(entity); + } + // Buffers can't be empty. Make sure there's something in the previous input buffer. + previous_input_buffer.ensure_nonempty(); + }); - for entity in removed_rx.iter().flatten() { - remove_mesh_input_uniform( - entity, - &mut *render_mesh_instances, - current_input_buffer, - ); - } - - // Buffers can't be empty. Make sure there's something in the previous input buffer. - previous_input_buffer.ensure_nonempty(); - }); - + // Spawn workers on the taskpool to prepare and update meshes in parallel. + let taskpool = ComputeTaskPool::get(); + let is_single_threaded = taskpool.thread_num() == 1; + taskpool.scope(|scope| { + // If we have multiple threads available, we can spawn a dedicated consumer worker before + // the parallel producers start sending data without any risk of deadlocking. + // + // This worker is the bottleneck of mesh preparation and can only run serially, so we want + // it to start working immediately. As soon as the parallel workers produce chunks of + // prepared meshes, this worker will consume them and update the GPU buffers. + if !is_single_threaded && let Some(mut consumer_worker) = mesh_consumer_worker.take() { + scope.spawn(async move { consumer_worker() }); + } + + // Iterate through each queue, spawning a task for each queue. This loop completes quickly + // as it does very little work, it is just spawning and moving data into tasks in a loop. for queue in render_mesh_instance_queues.iter_mut() { match *queue { RenderMeshInstanceGpuQueue::None => { @@ -1862,32 +1873,49 @@ pub fn collect_meshes_for_gpu_building( ref mut changed, ref mut removed, } => { - let _g = info_span!("scope_outer").entered(); - let cpu_culling = cpu_culling_changed.clone(); - if !removed.is_empty() { - removed_tx.send(core::mem::take(removed)).ok(); - } + let prepared_tx = prepared_tx.clone(); + let reextract_tx = reextract_tx.clone(); + let removed_tx = removed_tx.clone(); scope.spawn(async move { - let _g = info_span!("par_cpu_culling_scope").entered(); - while !changed.is_empty() { - let chunk = changed - .drain(..CHUNK_SIZE.min(changed.len())) - .map(|(entity, mesh_instance_builder)| { - ( - entity, - mesh_instance_builder.prepare( - entity, - mesh_allocator, - mesh_material_ids, - render_material_bindings, - render_lightmaps, - skin_uniforms, - frame_count, - ), - ) - }) - .collect(); - cpu_culling.send(chunk).ok(); + let _span = info_span!("prepared_mesh_producer").entered(); + let prepared_chunk = &mut prepared_chunk.borrow_local_mut(); + let reextract_chunk = &mut reextract_chunk.borrow_local_mut(); + changed + .drain(..) + .for_each(|(entity, mesh_instance_builder)| { + match mesh_instance_builder.prepare( + entity, + mesh_allocator, + mesh_material_ids, + render_material_bindings, + render_lightmaps, + skin_uniforms, + frame_count, + ) { + Some(prepared) => prepared_chunk.push((entity, prepared, None)), + None => reextract_chunk.push(entity), + } + + // Once a local batch of work has grown large enough, we send it to + // the consumer to start processing immediately, so it does not need + // to wait for us to process the entire queue. + if prepared_chunk.len() >= CHUNK_SIZE { + prepared_tx.send(core::mem::take(prepared_chunk)).ok(); + } + if reextract_chunk.len() >= CHUNK_SIZE { + reextract_tx.send(core::mem::take(reextract_chunk)).ok(); + } + }); + + // Send any remaining data to be processed + if !prepared_chunk.is_empty() { + prepared_tx.send(core::mem::take(prepared_chunk)).ok(); + } + if !reextract_chunk.is_empty() { + reextract_tx.send(core::mem::take(reextract_chunk)).ok(); + } + if !removed.is_empty() { + removed_tx.send(core::mem::take(removed)).ok(); } }); } @@ -1896,40 +1924,73 @@ pub fn collect_meshes_for_gpu_building( ref mut changed, ref mut removed, } => { - let _g = info_span!("scope_outer").entered(); - let gpu_culling = gpu_culling_changed.clone(); - if !removed.is_empty() { - removed_tx.send(core::mem::take(removed)).ok(); - } + let prepared_tx = prepared_tx.clone(); + let reextract_tx = reextract_tx.clone(); + let removed_tx = removed_tx.clone(); scope.spawn(async move { - let _g = info_span!("par_gpu_culling_scope").entered(); - while !changed.is_empty() { - let chunk = changed - .drain(..CHUNK_SIZE.min(changed.len())) - .map(|(entity, mesh_instance_builder, mesh_culling_builder)| { - ( + let _span = info_span!("prepared_mesh_producer").entered(); + let prepared_chunk = &mut prepared_chunk.borrow_local_mut(); + let reextract_chunk = &mut reextract_chunk.borrow_local_mut(); + changed.drain(..).for_each( + |(entity, mesh_instance_builder, mesh_culling_builder)| { + match mesh_instance_builder.prepare( + entity, + mesh_allocator, + mesh_material_ids, + render_material_bindings, + render_lightmaps, + skin_uniforms, + frame_count, + ) { + Some(prepared) => prepared_chunk.push(( entity, - mesh_instance_builder.prepare( - entity, - mesh_allocator, - mesh_material_ids, - render_material_bindings, - render_lightmaps, - skin_uniforms, - frame_count, - ), - mesh_culling_builder, - ) - }) - .collect(); - gpu_culling.send(chunk).ok(); + prepared, + Some(mesh_culling_builder), + )), + None => reextract_chunk.push(entity), + } + + // Once a local batch of work has grown large enough, we send it to + // the consumer to start processing immediately, so it does not need + // to wait for us to process the entire queue. + if prepared_chunk.len() >= CHUNK_SIZE { + prepared_tx.send(core::mem::take(prepared_chunk)).ok(); + } + if reextract_chunk.len() >= CHUNK_SIZE { + reextract_tx.send(core::mem::take(reextract_chunk)).ok(); + } + }, + ); + + // Send any remaining data to be processed + if !prepared_chunk.is_empty() { + prepared_tx.send(core::mem::take(prepared_chunk)).ok(); + } + if !reextract_chunk.is_empty() { + reextract_tx.send(core::mem::take(reextract_chunk)).ok(); + } + if !removed.is_empty() { + removed_tx.send(core::mem::take(removed)).ok(); } }); } } } + + // Drop the senders owned by the scope, so the only senders left are those captured by the + // spawned tasks. When the tasks are complete, the channels will close, and the consumer + // will finish. Without this, the scope would deadlock on the blocked consumer. + drop(prepared_tx); + drop(reextract_tx); drop(removed_tx); }); + + // In single threaded scenarios, we cannot spawn the worker first because it will deadlock, + // The worker would block the thread and prevent the mesh prepare tasks from ever starting and + // sending any data to be consumed. + if is_single_threaded && let Some(mut consumer_worker) = mesh_consumer_worker.take() { + consumer_worker(); + } } /// All data needed to construct a pipeline for rendering 3D meshes. From 8917e4d78a2aac4cdfb1a5124a0e9396c9370d85 Mon Sep 17 00:00:00 2001 From: Aevyrie Roessler Date: Mon, 29 Dec 2025 16:35:41 -0800 Subject: [PATCH 09/21] Fix doc links --- crates/bevy_pbr/src/render/mesh.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/bevy_pbr/src/render/mesh.rs b/crates/bevy_pbr/src/render/mesh.rs index 292435574b5c8..abcae4b7adcd7 100644 --- a/crates/bevy_pbr/src/render/mesh.rs +++ b/crates/bevy_pbr/src/render/mesh.rs @@ -837,7 +837,7 @@ pub struct MeshesToReextractNextFrame(MainEntityHashSet); impl RenderMeshInstanceShared { /// A gpu builder will provide the mesh instance id - /// during [`RenderMeshInstanceGpuBuilder::update`]. + /// during [`RenderMeshInstanceGpuPrepared::update`]. fn for_gpu_building( previous_transform: Option<&PreviousGlobalTransform>, mesh: &Mesh3d, @@ -859,7 +859,7 @@ impl RenderMeshInstanceShared { ) } - /// The cpu builder does not have an equivalent [`RenderMeshInstanceGpuBuilder::update`]. + /// The cpu builder does not have an equivalent [`RenderMeshInstanceGpuBuilder`]. fn for_cpu_building( previous_transform: Option<&PreviousGlobalTransform>, mesh: &Mesh3d, From 6f785cbeaf2002a024a5b2963bf6b60710813f9e Mon Sep 17 00:00:00 2001 From: Aevyrie Roessler Date: Mon, 29 Dec 2025 20:05:47 -0800 Subject: [PATCH 10/21] Tune chunk size --- crates/bevy_pbr/src/render/mesh.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_pbr/src/render/mesh.rs b/crates/bevy_pbr/src/render/mesh.rs index abcae4b7adcd7..d15b6d8bdb204 100644 --- a/crates/bevy_pbr/src/render/mesh.rs +++ b/crates/bevy_pbr/src/render/mesh.rs @@ -1797,7 +1797,7 @@ pub fn collect_meshes_for_gpu_building( /// The size of batches of data sent through the channel from parallel workers to the consumer. /// This was tuned based on benchmarks across a wide range of entity counts. - const CHUNK_SIZE: usize = 10_000; + const CHUNK_SIZE: usize = 1024; // Channels used by parallel workers to send data to the single consumer. let (prepared_tx, prepared_rx) = std::sync::mpsc::channel::< From eca0c308f44a92203c77d69d783b205b87fdc461 Mon Sep 17 00:00:00 2001 From: Aevyrie Roessler Date: Mon, 29 Dec 2025 20:09:54 -0800 Subject: [PATCH 11/21] Refactor order and comments for readability --- crates/bevy_pbr/src/render/mesh.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/crates/bevy_pbr/src/render/mesh.rs b/crates/bevy_pbr/src/render/mesh.rs index d15b6d8bdb204..1bf2b530b9bba 100644 --- a/crates/bevy_pbr/src/render/mesh.rs +++ b/crates/bevy_pbr/src/render/mesh.rs @@ -1792,7 +1792,6 @@ pub fn collect_meshes_for_gpu_building( previous_input_buffer, .. } = batched_instance_buffers.into_inner(); - previous_input_buffer.clear(); /// The size of batches of data sent through the channel from parallel workers to the consumer. @@ -1810,17 +1809,8 @@ pub fn collect_meshes_for_gpu_building( let (removed_tx, removed_rx) = std::sync::mpsc::channel::>(); let (reextract_tx, reextract_rx) = std::sync::mpsc::channel::>(); - // Reference data shared between tasks - let prepared_chunk = &prepared_chunk; - let reextract_chunk = &reextract_chunk; - let mesh_allocator = &mesh_allocator; - let mesh_material_ids = &mesh_material_ids; - let render_material_bindings = &render_material_bindings; - let render_lightmaps = &render_lightmaps; - let skin_uniforms = &skin_uniforms; - let frame_count = *frame_count; - // A single worker that consumes the meshes prepared in parallel by multiple producers. + // This part of the workload cannot be parallelized due to the shared mutable state. let mesh_consumer_worker = &mut Some(move || { let _span = info_span!("prepared_mesh_consumer").entered(); for (entity, prepared, mesh_culling_builder) in prepared_rx.iter().flatten() { @@ -1847,6 +1837,16 @@ pub fn collect_meshes_for_gpu_building( previous_input_buffer.ensure_nonempty(); }); + // Reference data shared between tasks + let prepared_chunk = &prepared_chunk; + let reextract_chunk = &reextract_chunk; + let mesh_allocator = &mesh_allocator; + let mesh_material_ids = &mesh_material_ids; + let render_material_bindings = &render_material_bindings; + let render_lightmaps = &render_lightmaps; + let skin_uniforms = &skin_uniforms; + let frame_count = *frame_count; + // Spawn workers on the taskpool to prepare and update meshes in parallel. let taskpool = ComputeTaskPool::get(); let is_single_threaded = taskpool.thread_num() == 1; From d1cd1e5173dc39b08904ef3ae60cab413ce21256 Mon Sep 17 00:00:00 2001 From: Aevyrie Roessler Date: Thu, 1 Jan 2026 18:16:46 -0800 Subject: [PATCH 12/21] Refactor buffered channel logic into a struct. --- crates/bevy_pbr/src/render/mesh.rs | 256 +++++++++++++++++++---------- 1 file changed, 168 insertions(+), 88 deletions(-) diff --git a/crates/bevy_pbr/src/render/mesh.rs b/crates/bevy_pbr/src/render/mesh.rs index 1bf2b530b9bba..7498b214348b4 100644 --- a/crates/bevy_pbr/src/render/mesh.rs +++ b/crates/bevy_pbr/src/render/mesh.rs @@ -1750,8 +1750,139 @@ pub fn set_mesh_motion_vector_flags( } } +/// A channel that uses thread locals to buffer messages for efficient parallel processing. +/// +/// Cache this channel in a [`Local`] between system runs to reuse allocated memory. +/// +/// This is faster than sending each message individually into a channel when communicating between +/// tasks. Unlike `Parallel`, this allows you to execute a consuming task while producing tasks are +/// concurrently sending data into the channel, enabling you to run serial processing concurrently +/// with the parallel processing step of an algorithm. +/// +/// ### Usage +/// +/// ``` +/// use bevy_app::{App, ScheduleRunnerPlugin, TaskPoolPlugin, Update}; +/// use bevy_diagnostic::FrameCountPlugin; +/// use bevy_ecs::system::Local; +/// use bevy_pbr::BufferedChannel; +/// use bevy_tasks::{ComputeTaskPool, TaskPool, TaskPoolBuilder}; +/// +/// App::new().add_plugins(TaskPoolPlugin::default()).add_systems(Update, parallel_system).update(); +/// +/// fn parallel_system(channel: Local>) { +/// // Every update, set up the channel to get a sender and receiver. +/// let (rx, tx) = channel.setup(); +/// +/// ComputeTaskPool::get().scope(|scope| { +/// // Spawn a few producing tasks in parallel that send data using a buffered channel +/// for _ in 0..5 { +/// let tx = tx.clone(); +/// scope.spawn(async move { +/// for i in 0..10_000 { +/// tx.send(i); +/// } +/// }); +/// } +/// drop(tx); +/// +/// // Spawn a single consumer task that reads from the producers as +/// scope.spawn(async move { +/// // Note we have to `flatten()`, as we are receiving batches of values in `Vec`s. +/// let total: u32 = rx.iter().flatten().sum(); +/// assert_eq!(total, 249_975_000); +/// }); +/// }); +/// } +/// ``` +pub struct BufferedChannel { + /// The size of batches of buffered data sent through the channel. + pub chunk_size: usize, + /// Thread-local buffer of `T`. + buffer: Parallel>, +} + +impl Default for BufferedChannel { + fn default() -> Self { + Self { + // This was tuned based on benchmarks across a wide range of sizes. + chunk_size: 1024, + buffer: Parallel::default(), + } + } +} + +/// A [`BufferedChannel`] sender that buffers messages in a thread local `Vec`, flushing messages +/// from the thread local when the buffered sender is dropped. +pub struct BufferedSender<'a, T: Send> { + chunk_size: usize, + parallel: &'a Parallel>, + // Use a `std` channel, the speed doesn't matter much because we are buffering. + tx: std::sync::mpsc::Sender>, +} + +impl BufferedChannel { + pub fn setup(&self) -> (std::sync::mpsc::Receiver>, BufferedSender<'_, T>) { + let (tx, rx) = std::sync::mpsc::channel(); + ( + rx, + BufferedSender { + chunk_size: self.chunk_size, + parallel: &self.buffer, + tx, + }, + ) + } +} + +impl<'a, T: Send> BufferedSender<'a, T> { + /// Send an item. This is buffered and will only be sent into the channel once the number of + /// items in the thread local has exceeded [`BufferedChannel::chunk_size`], or the sender is + /// dropped and the thread local buffer is automatically flushed. + pub fn send(&self, item: T) { + let mut local = self.parallel.borrow_local_mut(); + local.push(item); + if local.len() >= self.chunk_size { + let _ = self.tx.send(core::mem::take(&mut *local)); + } + } + + fn flush(&self) { + let mut local = self.parallel.borrow_local_mut(); + if !local.is_empty() { + let _ = self.tx.send(core::mem::take(&mut *local)); + } + } +} + +impl<'a, T: Send> Clone for BufferedSender<'a, T> { + fn clone(&self) -> Self { + Self { + chunk_size: self.chunk_size, + parallel: self.parallel, + tx: self.tx.clone(), + } + } +} + +impl<'a, T: Send> Drop for BufferedSender<'a, T> { + fn drop(&mut self) { + self.flush(); + } +} + +#[derive(Default)] +pub struct GpuMeshBuildingChunks { + prepared: BufferedChannel<( + MainEntity, + RenderMeshInstanceGpuPrepared, + Option, + )>, + reextract: BufferedChannel, + removed: BufferedChannel, +} + /// Creates the [`RenderMeshInstanceGpu`]s and [`MeshInputUniform`]s when GPU -/// mesh uniforms are built. pub fn collect_meshes_for_gpu_building( render_mesh_instances: ResMut, batched_instance_buffers: ResMut< @@ -1766,16 +1897,7 @@ pub fn collect_meshes_for_gpu_building( skin_uniforms: Res, frame_count: Res, mut meshes_to_reextract_next_frame: ResMut, - prepared_chunk: Local< - Parallel< - Vec<( - MainEntity, - RenderMeshInstanceGpuPrepared, - Option, - )>, - >, - >, - reextract_chunk: Local>>, + chunks: Local, ) { let RenderMeshInstances::GpuBuilding(render_mesh_instances) = render_mesh_instances.into_inner() @@ -1794,25 +1916,18 @@ pub fn collect_meshes_for_gpu_building( } = batched_instance_buffers.into_inner(); previous_input_buffer.clear(); - /// The size of batches of data sent through the channel from parallel workers to the consumer. - /// This was tuned based on benchmarks across a wide range of entity counts. - const CHUNK_SIZE: usize = 1024; - // Channels used by parallel workers to send data to the single consumer. - let (prepared_tx, prepared_rx) = std::sync::mpsc::channel::< - Vec<( - MainEntity, - RenderMeshInstanceGpuPrepared, - Option, - )>, - >(); - let (removed_tx, removed_rx) = std::sync::mpsc::channel::>(); - let (reextract_tx, reextract_rx) = std::sync::mpsc::channel::>(); + let (prepared_rx, prepared_tx) = chunks.prepared.setup(); + let (reextract_rx, reextract_tx) = chunks.reextract.setup(); + let (removed_rx, removed_tx) = chunks.removed.setup(); // A single worker that consumes the meshes prepared in parallel by multiple producers. // This part of the workload cannot be parallelized due to the shared mutable state. let mesh_consumer_worker = &mut Some(move || { let _span = info_span!("prepared_mesh_consumer").entered(); + // In an ideal world, these three for loops would be a single loop, so we can process + // incoming batches no matter which channel they come into, instead of one at a time. + // Probably doable using async_channel and awaiting all three receivers at once. for (entity, prepared, mesh_culling_builder) in prepared_rx.iter().flatten() { let Some(instance_data_index) = prepared.update( entity, @@ -1838,8 +1953,6 @@ pub fn collect_meshes_for_gpu_building( }); // Reference data shared between tasks - let prepared_chunk = &prepared_chunk; - let reextract_chunk = &reextract_chunk; let mesh_allocator = &mesh_allocator; let mesh_material_ids = &mesh_material_ids; let render_material_bindings = &render_material_bindings; @@ -1878,44 +1991,28 @@ pub fn collect_meshes_for_gpu_building( let removed_tx = removed_tx.clone(); scope.spawn(async move { let _span = info_span!("prepared_mesh_producer").entered(); - let prepared_chunk = &mut prepared_chunk.borrow_local_mut(); - let reextract_chunk = &mut reextract_chunk.borrow_local_mut(); changed .drain(..) - .for_each(|(entity, mesh_instance_builder)| { - match mesh_instance_builder.prepare( - entity, - mesh_allocator, - mesh_material_ids, - render_material_bindings, - render_lightmaps, - skin_uniforms, - frame_count, - ) { - Some(prepared) => prepared_chunk.push((entity, prepared, None)), - None => reextract_chunk.push(entity), - } - - // Once a local batch of work has grown large enough, we send it to - // the consumer to start processing immediately, so it does not need - // to wait for us to process the entire queue. - if prepared_chunk.len() >= CHUNK_SIZE { - prepared_tx.send(core::mem::take(prepared_chunk)).ok(); - } - if reextract_chunk.len() >= CHUNK_SIZE { - reextract_tx.send(core::mem::take(reextract_chunk)).ok(); - } - }); + .for_each( + |(entity, mesh_instance_builder)| match mesh_instance_builder + .prepare( + entity, + mesh_allocator, + mesh_material_ids, + render_material_bindings, + render_lightmaps, + skin_uniforms, + frame_count, + ) { + Some(prepared) => { + prepared_tx.send((entity, prepared, None)); + } + None => reextract_tx.send(entity), + }, + ); - // Send any remaining data to be processed - if !prepared_chunk.is_empty() { - prepared_tx.send(core::mem::take(prepared_chunk)).ok(); - } - if !reextract_chunk.is_empty() { - reextract_tx.send(core::mem::take(reextract_chunk)).ok(); - } - if !removed.is_empty() { - removed_tx.send(core::mem::take(removed)).ok(); + for entity in removed.drain(..) { + removed_tx.send(entity); } }); } @@ -1929,8 +2026,6 @@ pub fn collect_meshes_for_gpu_building( let removed_tx = removed_tx.clone(); scope.spawn(async move { let _span = info_span!("prepared_mesh_producer").entered(); - let prepared_chunk = &mut prepared_chunk.borrow_local_mut(); - let reextract_chunk = &mut reextract_chunk.borrow_local_mut(); changed.drain(..).for_each( |(entity, mesh_instance_builder, mesh_culling_builder)| { match mesh_instance_builder.prepare( @@ -1942,35 +2037,20 @@ pub fn collect_meshes_for_gpu_building( skin_uniforms, frame_count, ) { - Some(prepared) => prepared_chunk.push(( - entity, - prepared, - Some(mesh_culling_builder), - )), - None => reextract_chunk.push(entity), - } - - // Once a local batch of work has grown large enough, we send it to - // the consumer to start processing immediately, so it does not need - // to wait for us to process the entire queue. - if prepared_chunk.len() >= CHUNK_SIZE { - prepared_tx.send(core::mem::take(prepared_chunk)).ok(); - } - if reextract_chunk.len() >= CHUNK_SIZE { - reextract_tx.send(core::mem::take(reextract_chunk)).ok(); + Some(prepared) => { + prepared_tx.send(( + entity, + prepared, + Some(mesh_culling_builder), + )); + } + None => reextract_tx.send(entity), } }, ); - // Send any remaining data to be processed - if !prepared_chunk.is_empty() { - prepared_tx.send(core::mem::take(prepared_chunk)).ok(); - } - if !reextract_chunk.is_empty() { - reextract_tx.send(core::mem::take(reextract_chunk)).ok(); - } - if !removed.is_empty() { - removed_tx.send(core::mem::take(removed)).ok(); + for entity in removed.drain(..) { + removed_tx.send(entity); } }); } From 773592489f9ba76c72f97ae0bd97f97b9f330bfe Mon Sep 17 00:00:00 2001 From: Aevyrie Roessler Date: Thu, 1 Jan 2026 20:16:07 -0800 Subject: [PATCH 13/21] Async buffered channel --- crates/bevy_pbr/src/render/mesh.rs | 211 +++++----------- crates/bevy_utils/Cargo.toml | 6 +- crates/bevy_utils/src/buffered_channel.rs | 279 ++++++++++++++++++++++ crates/bevy_utils/src/lib.rs | 3 + 4 files changed, 341 insertions(+), 158 deletions(-) create mode 100644 crates/bevy_utils/src/buffered_channel.rs diff --git a/crates/bevy_pbr/src/render/mesh.rs b/crates/bevy_pbr/src/render/mesh.rs index 7498b214348b4..4c7bc9a80982d 100644 --- a/crates/bevy_pbr/src/render/mesh.rs +++ b/crates/bevy_pbr/src/render/mesh.rs @@ -61,7 +61,7 @@ use bevy_render::{ }; use bevy_shader::{load_shader_library, Shader, ShaderDefVal, ShaderSettings}; use bevy_transform::components::GlobalTransform; -use bevy_utils::{default, Parallel, TypeIdMap}; +use bevy_utils::{default, BufferedChannel, Parallel, TypeIdMap}; use core::any::TypeId; use core::mem::size_of; use material_bind_groups::MaterialBindingId; @@ -1750,127 +1750,6 @@ pub fn set_mesh_motion_vector_flags( } } -/// A channel that uses thread locals to buffer messages for efficient parallel processing. -/// -/// Cache this channel in a [`Local`] between system runs to reuse allocated memory. -/// -/// This is faster than sending each message individually into a channel when communicating between -/// tasks. Unlike `Parallel`, this allows you to execute a consuming task while producing tasks are -/// concurrently sending data into the channel, enabling you to run serial processing concurrently -/// with the parallel processing step of an algorithm. -/// -/// ### Usage -/// -/// ``` -/// use bevy_app::{App, ScheduleRunnerPlugin, TaskPoolPlugin, Update}; -/// use bevy_diagnostic::FrameCountPlugin; -/// use bevy_ecs::system::Local; -/// use bevy_pbr::BufferedChannel; -/// use bevy_tasks::{ComputeTaskPool, TaskPool, TaskPoolBuilder}; -/// -/// App::new().add_plugins(TaskPoolPlugin::default()).add_systems(Update, parallel_system).update(); -/// -/// fn parallel_system(channel: Local>) { -/// // Every update, set up the channel to get a sender and receiver. -/// let (rx, tx) = channel.setup(); -/// -/// ComputeTaskPool::get().scope(|scope| { -/// // Spawn a few producing tasks in parallel that send data using a buffered channel -/// for _ in 0..5 { -/// let tx = tx.clone(); -/// scope.spawn(async move { -/// for i in 0..10_000 { -/// tx.send(i); -/// } -/// }); -/// } -/// drop(tx); -/// -/// // Spawn a single consumer task that reads from the producers as -/// scope.spawn(async move { -/// // Note we have to `flatten()`, as we are receiving batches of values in `Vec`s. -/// let total: u32 = rx.iter().flatten().sum(); -/// assert_eq!(total, 249_975_000); -/// }); -/// }); -/// } -/// ``` -pub struct BufferedChannel { - /// The size of batches of buffered data sent through the channel. - pub chunk_size: usize, - /// Thread-local buffer of `T`. - buffer: Parallel>, -} - -impl Default for BufferedChannel { - fn default() -> Self { - Self { - // This was tuned based on benchmarks across a wide range of sizes. - chunk_size: 1024, - buffer: Parallel::default(), - } - } -} - -/// A [`BufferedChannel`] sender that buffers messages in a thread local `Vec`, flushing messages -/// from the thread local when the buffered sender is dropped. -pub struct BufferedSender<'a, T: Send> { - chunk_size: usize, - parallel: &'a Parallel>, - // Use a `std` channel, the speed doesn't matter much because we are buffering. - tx: std::sync::mpsc::Sender>, -} - -impl BufferedChannel { - pub fn setup(&self) -> (std::sync::mpsc::Receiver>, BufferedSender<'_, T>) { - let (tx, rx) = std::sync::mpsc::channel(); - ( - rx, - BufferedSender { - chunk_size: self.chunk_size, - parallel: &self.buffer, - tx, - }, - ) - } -} - -impl<'a, T: Send> BufferedSender<'a, T> { - /// Send an item. This is buffered and will only be sent into the channel once the number of - /// items in the thread local has exceeded [`BufferedChannel::chunk_size`], or the sender is - /// dropped and the thread local buffer is automatically flushed. - pub fn send(&self, item: T) { - let mut local = self.parallel.borrow_local_mut(); - local.push(item); - if local.len() >= self.chunk_size { - let _ = self.tx.send(core::mem::take(&mut *local)); - } - } - - fn flush(&self) { - let mut local = self.parallel.borrow_local_mut(); - if !local.is_empty() { - let _ = self.tx.send(core::mem::take(&mut *local)); - } - } -} - -impl<'a, T: Send> Clone for BufferedSender<'a, T> { - fn clone(&self) -> Self { - Self { - chunk_size: self.chunk_size, - parallel: self.parallel, - tx: self.tx.clone(), - } - } -} - -impl<'a, T: Send> Drop for BufferedSender<'a, T> { - fn drop(&mut self) { - self.flush(); - } -} - #[derive(Default)] pub struct GpuMeshBuildingChunks { prepared: BufferedChannel<( @@ -1917,9 +1796,9 @@ pub fn collect_meshes_for_gpu_building( previous_input_buffer.clear(); // Channels used by parallel workers to send data to the single consumer. - let (prepared_rx, prepared_tx) = chunks.prepared.setup(); - let (reextract_rx, reextract_tx) = chunks.reextract.setup(); - let (removed_rx, removed_tx) = chunks.removed.setup(); + let (prepared_rx, prepared_tx) = chunks.prepared.unbounded(); + let (reextract_rx, reextract_tx) = chunks.reextract.unbounded(); + let (removed_rx, removed_tx) = chunks.removed.unbounded(); // A single worker that consumes the meshes prepared in parallel by multiple producers. // This part of the workload cannot be parallelized due to the shared mutable state. @@ -1928,25 +1807,35 @@ pub fn collect_meshes_for_gpu_building( // In an ideal world, these three for loops would be a single loop, so we can process // incoming batches no matter which channel they come into, instead of one at a time. // Probably doable using async_channel and awaiting all three receivers at once. - for (entity, prepared, mesh_culling_builder) in prepared_rx.iter().flatten() { - let Some(instance_data_index) = prepared.update( - entity, - &mut *render_mesh_instances, - current_input_buffer, - previous_input_buffer, - ) else { - continue; - }; - if let Some(mesh_culling_data) = mesh_culling_builder { - mesh_culling_data - .update(&mut mesh_culling_data_buffer, instance_data_index as usize); + while let Ok(batch) = prepared_rx.recv_blocking() { + for (entity, prepared, mesh_culling_builder) in batch { + let Some(instance_data_index) = prepared.update( + entity, + &mut *render_mesh_instances, + current_input_buffer, + previous_input_buffer, + ) else { + continue; + }; + if let Some(mesh_culling_data) = mesh_culling_builder { + mesh_culling_data + .update(&mut mesh_culling_data_buffer, instance_data_index as usize); + } } } - for entity in removed_rx.iter().flatten() { - remove_mesh_input_uniform(entity, &mut *render_mesh_instances, current_input_buffer); + while let Ok(batch) = removed_rx.recv_blocking() { + for entity in batch { + remove_mesh_input_uniform( + entity, + &mut *render_mesh_instances, + current_input_buffer, + ); + } } - for entity in reextract_rx.iter().flatten() { - meshes_to_reextract_next_frame.insert(entity); + while let Ok(batch) = reextract_rx.recv_blocking() { + for entity in batch { + meshes_to_reextract_next_frame.insert(entity); + } } // Buffers can't be empty. Make sure there's something in the previous input buffer. previous_input_buffer.ensure_nonempty(); @@ -1986,9 +1875,9 @@ pub fn collect_meshes_for_gpu_building( ref mut changed, ref mut removed, } => { - let prepared_tx = prepared_tx.clone(); - let reextract_tx = reextract_tx.clone(); - let removed_tx = removed_tx.clone(); + let mut prepared_tx = prepared_tx.clone(); + let mut reextract_tx = reextract_tx.clone(); + let mut removed_tx = removed_tx.clone(); scope.spawn(async move { let _span = info_span!("prepared_mesh_producer").entered(); changed @@ -2005,14 +1894,18 @@ pub fn collect_meshes_for_gpu_building( frame_count, ) { Some(prepared) => { - prepared_tx.send((entity, prepared, None)); + prepared_tx + .send_blocking((entity, prepared, None)) + .unwrap(); + } + None => { + reextract_tx.send_blocking(entity).unwrap(); } - None => reextract_tx.send(entity), }, ); for entity in removed.drain(..) { - removed_tx.send(entity); + removed_tx.send_blocking(entity).unwrap(); } }); } @@ -2021,9 +1914,9 @@ pub fn collect_meshes_for_gpu_building( ref mut changed, ref mut removed, } => { - let prepared_tx = prepared_tx.clone(); - let reextract_tx = reextract_tx.clone(); - let removed_tx = removed_tx.clone(); + let mut prepared_tx = prepared_tx.clone(); + let mut reextract_tx = reextract_tx.clone(); + let mut removed_tx = removed_tx.clone(); scope.spawn(async move { let _span = info_span!("prepared_mesh_producer").entered(); changed.drain(..).for_each( @@ -2038,19 +1931,23 @@ pub fn collect_meshes_for_gpu_building( frame_count, ) { Some(prepared) => { - prepared_tx.send(( - entity, - prepared, - Some(mesh_culling_builder), - )); + prepared_tx + .send_blocking(( + entity, + prepared, + Some(mesh_culling_builder), + )) + .unwrap(); + } + None => { + reextract_tx.send_blocking(entity).unwrap(); } - None => reextract_tx.send(entity), } }, ); for entity in removed.drain(..) { - removed_tx.send(entity); + removed_tx.send_blocking(entity).unwrap(); } }); } diff --git a/crates/bevy_utils/Cargo.toml b/crates/bevy_utils/Cargo.toml index bc95f3e070580..5549ff88a8054 100644 --- a/crates/bevy_utils/Cargo.toml +++ b/crates/bevy_utils/Cargo.toml @@ -12,7 +12,7 @@ keywords = ["bevy"] default = ["parallel"] # Provides access to the `Parallel` type. -parallel = ["bevy_platform/std", "dep:thread_local"] +parallel = ["bevy_platform/std", "dep:thread_local", "dep:async-channel"] std = ["disqualified/alloc"] @@ -23,9 +23,13 @@ bevy_platform = { path = "../bevy_platform", version = "0.18.0-dev", default-fea disqualified = { version = "1.0", default-features = false } thread_local = { version = "1.0", optional = true } +async-channel = { version = "2.3.0", optional = true } [dev-dependencies] static_assertions = "1.1.0" +bevy_ecs = { path = "../bevy_ecs", version = "0.18.0-dev", default-features = false } +bevy_app = { path = "../bevy_app", version = "0.18.0-dev", default-features = false } +bevy_tasks = { path = "../bevy_tasks", version = "0.18.0-dev", default-features = false } [lints] workspace = true diff --git a/crates/bevy_utils/src/buffered_channel.rs b/crates/bevy_utils/src/buffered_channel.rs new file mode 100644 index 0000000000000..c7321ae36a08b --- /dev/null +++ b/crates/bevy_utils/src/buffered_channel.rs @@ -0,0 +1,279 @@ +use crate::Parallel; +use alloc::vec::Vec; +use async_channel::{Receiver, Sender}; +use core::ops::{Deref, DerefMut}; + +/// An asynchronous MPSC channel that buffers messages and reuses allocations with thread locals. +/// +/// This is a building block for efficient parallel worker tasks. +/// +/// Cache this channel in a system's [`Local`] to reuse allocated memory. +/// +/// This is faster than sending each message individually into a channel when communicating between +/// tasks. Unlike `Parallel`, this allows you to execute a consuming task while producing tasks are +/// concurrently sending data into the channel, enabling you to run a serial processing consumer +/// at the same time as many parallel processing producers. +/// +/// # Usage +/// +/// ``` +/// use bevy_utils::BufferedChannel; +/// use bevy_app::{App, TaskPoolPlugin, Update}; +/// use bevy_ecs::system::Local; +/// use bevy_tasks::ComputeTaskPool; +/// +/// App::new() +/// .add_plugins(TaskPoolPlugin::default()) +/// .add_systems(Update, parallel_system) +/// .update(); +/// +/// fn parallel_system(channel: Local>) { +/// let (rx, tx) = channel.unbounded(); +/// ComputeTaskPool::get().scope(|scope| { +/// // Spawn a single consumer task that reads from the producers. Note we can spawn this +/// // first and have it immediately start processing the messages produced in parallel. +/// // Because we are receiving asynchronously, we avoid deadlocks even on a single thread. +/// scope.spawn(async move { +/// let mut total = 0; +/// let mut count = 0; +/// while let Ok(chunk) = rx.recv().await { +/// count += chunk.len(); +/// total += chunk.iter().sum::(); +/// } +/// assert_eq!(count, 500_000); +/// assert_eq!(total, 24_999_750_000); +/// }); +/// +/// // Spawn a few producing tasks in parallel that send data into the buffered channel. +/// for _ in 0..5 { +/// let mut tx = tx.clone(); +/// scope.spawn(async move { +/// // Because this is buffered, we can iterate over hundreds of thousands of +/// // entities in each task while avoiding allocation and channel overhead. +/// // The buffer is flushed periodically, sending chunks of data to the receiver. +/// for i in 0..100_000 { +/// tx.send(i).await; +/// } +/// }); +/// } +/// +/// // Drop the unused sender so the channel can close. +/// drop(tx); +/// }); +/// } +/// ``` +pub struct BufferedChannel { + /// The minimum length of a `Vec` of buffered data before it is sent through the channel. + pub chunk_size: usize, + /// A pool of reusable vectors to minimize allocations. + pool: Parallel>>, +} + +impl Default for BufferedChannel { + fn default() -> Self { + Self { + // This was tuned based on benchmarks across a wide range of sizes. + chunk_size: 1024, + pool: Parallel::default(), + } + } +} + +/// A wrapper around a [`Receiver`] that returns [`RecycledVec`]s to automatically return +/// buffers to the [`BufferedChannel`] pool. +pub struct BufferedReceiver<'a, T: Send> { + channel: &'a BufferedChannel, + rx: Receiver>, +} + +impl<'a, T: Send> BufferedReceiver<'a, T> { + /// Receive a message asynchronously. + /// + /// The returned [`RecycledVec`] will automatically return the buffer to the pool when dropped. + pub async fn recv(&self) -> Result, async_channel::RecvError> { + let buffer = self.rx.recv().await?; + Ok(RecycledVec { + buffer: Some(buffer), + channel: self.channel, + }) + } + + /// Receive a message blocking. + /// + /// The returned [`RecycledVec`] will automatically return the buffer to the pool when dropped. + pub fn recv_blocking(&self) -> Result, async_channel::RecvError> { + let buffer = self.rx.recv_blocking()?; + Ok(RecycledVec { + buffer: Some(buffer), + channel: self.channel, + }) + } +} + +/// A wrapper around a `Vec` that automatically returns it to the [`BufferedChannel`]'s pool when +/// dropped. +pub struct RecycledVec<'a, T: Send> { + buffer: Option>, + channel: &'a BufferedChannel, +} + +impl<'a, T: Send> Deref for RecycledVec<'a, T> { + type Target = [T]; + fn deref(&self) -> &Self::Target { + self.buffer.as_ref().unwrap() + } +} + +impl<'a, T: Send> DerefMut for RecycledVec<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + self.buffer.as_mut().unwrap() + } +} + +impl<'a, T: Send> IntoIterator for RecycledVec<'a, T> { + type Item = T; + type IntoIter = alloc::vec::IntoIter; + + fn into_iter(mut self) -> Self::IntoIter { + self.buffer.take().unwrap().into_iter() + } +} + +impl<'a, 'b, T: Send> IntoIterator for &'b RecycledVec<'a, T> { + type Item = &'b T; + type IntoIter = core::slice::Iter<'b, T>; + + fn into_iter(self) -> Self::IntoIter { + self.buffer.as_ref().unwrap().iter() + } +} + +impl<'a, 'b, T: Send> IntoIterator for &'b mut RecycledVec<'a, T> { + type Item = &'b mut T; + type IntoIter = core::slice::IterMut<'b, T>; + + fn into_iter(self) -> Self::IntoIter { + self.buffer.as_mut().unwrap().iter_mut() + } +} + +impl<'a, T: Send> Drop for RecycledVec<'a, T> { + fn drop(&mut self) { + if let Some(mut buffer) = self.buffer.take() { + buffer.clear(); + self.channel.pool.borrow_local_mut().push(buffer); + } + } +} + +/// A [`BufferedChannel`] sender that buffers messages locally, flushing it when the sender is +/// dropped or [`BufferedChannel::chunk_size`] is reached. +pub struct BufferedSender<'a, T: Send> { + channel: &'a BufferedChannel, + /// We use an `Option` to lazily allocate the buffer or pull from the channel's buffer pool. + buffer: Option>, + tx: Sender>, +} + +impl BufferedChannel { + fn get_buffer(&self) -> Vec { + self.pool + .borrow_local_mut() + .pop() + .unwrap_or_else(|| Vec::with_capacity(self.chunk_size)) + } + + /// Create an unbounded channel and return the receiver and sender. + /// + /// The created channel can hold an unlimited number of messages. + pub fn unbounded(&self) -> (BufferedReceiver<'_, T>, BufferedSender<'_, T>) { + let (tx, rx) = async_channel::unbounded(); + ( + BufferedReceiver { channel: self, rx }, + BufferedSender { + channel: self, + buffer: None, + tx, + }, + ) + } + + /// Create a bounded channel and return the receiver and sender. + /// + /// The created channel has space to hold at most `cap` messages at a time. + /// + /// # Panics + /// + /// Capacity must be a positive number. If `cap` is zero, this function will panic. + pub fn bounded(&self, cap: usize) -> (BufferedReceiver<'_, T>, BufferedSender<'_, T>) { + let (tx, rx) = async_channel::bounded(cap); + ( + BufferedReceiver { channel: self, rx }, + BufferedSender { + channel: self, + buffer: None, + tx, + }, + ) + } +} + +impl<'a, T: Send> BufferedSender<'a, T> { + /// Send a message asynchronously. + /// + /// This is buffered and will not be sent into the channel until [`BufferedChannel::chunk_size`] + /// messages are accumulated or the sender is dropped. + pub async fn send(&mut self, msg: T) -> Result<(), async_channel::SendError>> { + let buffer = self.buffer.get_or_insert_with(|| self.channel.get_buffer()); + buffer.push(msg); + if buffer.len() >= self.channel.chunk_size { + let full_buffer = self.buffer.take().unwrap(); + self.tx.send(full_buffer).await?; + } + Ok(()) + } + + /// Send an item blocking. + /// + /// This is buffered and will not be sent into the channel until [`BufferedChannel::chunk_size`] + /// messages are accumulated or the sender is dropped. + pub fn send_blocking(&mut self, msg: T) -> Result<(), async_channel::SendError>> { + let buffer = self.buffer.get_or_insert_with(|| self.channel.get_buffer()); + buffer.push(msg); + if buffer.len() >= self.channel.chunk_size { + let full_buffer = self.buffer.take().unwrap(); + self.tx.send_blocking(full_buffer)?; + } + Ok(()) + } + + /// Flush any remaining messages in the local buffer, sending them into the channel. + fn flush(&mut self) { + if let Some(buffer) = self.buffer.take() { + if !buffer.is_empty() { + // The allocation is sent through the channel and will be reused when dropped. + let _ = self.tx.send_blocking(buffer); + } else { + // If it's empty, just return it to the pool. + self.channel.pool.borrow_local_mut().push(buffer); + } + } + } +} + +impl<'a, T: Send> Clone for BufferedSender<'a, T> { + fn clone(&self) -> Self { + Self { + channel: self.channel, + buffer: None, + tx: self.tx.clone(), + } + } +} + +/// Automatically flush the buffer when a sender is dropped. +impl<'a, T: Send> Drop for BufferedSender<'a, T> { + fn drop(&mut self) { + self.flush(); + } +} diff --git a/crates/bevy_utils/src/lib.rs b/crates/bevy_utils/src/lib.rs index c1ea69b41a0d9..6d6185a13e7e1 100644 --- a/crates/bevy_utils/src/lib.rs +++ b/crates/bevy_utils/src/lib.rs @@ -37,8 +37,11 @@ cfg::alloc! { cfg::parallel! { mod parallel_queue; pub use parallel_queue::*; + mod buffered_channel; + pub use buffered_channel::*; } + /// The utilities prelude. /// /// This includes the most common types in this crate, re-exported for your convenience. From 9d8680a69e2c72c0c359082b1df660aa5f3ad3f8 Mon Sep 17 00:00:00 2001 From: Aevyrie Roessler Date: Thu, 1 Jan 2026 20:39:50 -0800 Subject: [PATCH 14/21] Correctly instrument async closure, add doc comments to gpu struct --- crates/bevy_pbr/src/render/mesh.rs | 119 ++++++++++++----------------- 1 file changed, 49 insertions(+), 70 deletions(-) diff --git a/crates/bevy_pbr/src/render/mesh.rs b/crates/bevy_pbr/src/render/mesh.rs index 4c7bc9a80982d..d14da20dbe2f2 100644 --- a/crates/bevy_pbr/src/render/mesh.rs +++ b/crates/bevy_pbr/src/render/mesh.rs @@ -65,7 +65,7 @@ use bevy_utils::{default, BufferedChannel, Parallel, TypeIdMap}; use core::any::TypeId; use core::mem::size_of; use material_bind_groups::MaterialBindingId; -use tracing::{error, info_span, warn}; +use tracing::{error, info_span, warn, Instrument}; use self::irradiance_volume::IRRADIANCE_VOLUMES_ARE_USABLE; use crate::{ @@ -1218,8 +1218,11 @@ impl RenderMeshInstanceGpuBuilder { } pub struct RenderMeshInstanceGpuPrepared { + /// Data shared between the CPU and GPU versions of this mesh instance. shared: RenderMeshInstanceShared, + /// The data that will be uploaded to the GPU as a [`MeshInputUniform`]. mesh_input_uniform: MeshInputUniform, + /// The world-space center of the mesh instance, used for culling and sorting. center: Vec3, } @@ -1800,47 +1803,6 @@ pub fn collect_meshes_for_gpu_building( let (reextract_rx, reextract_tx) = chunks.reextract.unbounded(); let (removed_rx, removed_tx) = chunks.removed.unbounded(); - // A single worker that consumes the meshes prepared in parallel by multiple producers. - // This part of the workload cannot be parallelized due to the shared mutable state. - let mesh_consumer_worker = &mut Some(move || { - let _span = info_span!("prepared_mesh_consumer").entered(); - // In an ideal world, these three for loops would be a single loop, so we can process - // incoming batches no matter which channel they come into, instead of one at a time. - // Probably doable using async_channel and awaiting all three receivers at once. - while let Ok(batch) = prepared_rx.recv_blocking() { - for (entity, prepared, mesh_culling_builder) in batch { - let Some(instance_data_index) = prepared.update( - entity, - &mut *render_mesh_instances, - current_input_buffer, - previous_input_buffer, - ) else { - continue; - }; - if let Some(mesh_culling_data) = mesh_culling_builder { - mesh_culling_data - .update(&mut mesh_culling_data_buffer, instance_data_index as usize); - } - } - } - while let Ok(batch) = removed_rx.recv_blocking() { - for entity in batch { - remove_mesh_input_uniform( - entity, - &mut *render_mesh_instances, - current_input_buffer, - ); - } - } - while let Ok(batch) = reextract_rx.recv_blocking() { - for entity in batch { - meshes_to_reextract_next_frame.insert(entity); - } - } - // Buffers can't be empty. Make sure there's something in the previous input buffer. - previous_input_buffer.ensure_nonempty(); - }); - // Reference data shared between tasks let mesh_allocator = &mesh_allocator; let mesh_material_ids = &mesh_material_ids; @@ -1850,18 +1812,49 @@ pub fn collect_meshes_for_gpu_building( let frame_count = *frame_count; // Spawn workers on the taskpool to prepare and update meshes in parallel. - let taskpool = ComputeTaskPool::get(); - let is_single_threaded = taskpool.thread_num() == 1; - taskpool.scope(|scope| { - // If we have multiple threads available, we can spawn a dedicated consumer worker before - // the parallel producers start sending data without any risk of deadlocking. - // + ComputeTaskPool::get().scope(|scope| { // This worker is the bottleneck of mesh preparation and can only run serially, so we want // it to start working immediately. As soon as the parallel workers produce chunks of // prepared meshes, this worker will consume them and update the GPU buffers. - if !is_single_threaded && let Some(mut consumer_worker) = mesh_consumer_worker.take() { - scope.spawn(async move { consumer_worker() }); - } + scope.spawn( + async move { + while let Ok(batch) = prepared_rx.recv().await { + for (entity, prepared, mesh_culling_builder) in batch { + let Some(instance_data_index) = prepared.update( + entity, + &mut *render_mesh_instances, + current_input_buffer, + previous_input_buffer, + ) else { + continue; + }; + if let Some(mesh_culling_data) = mesh_culling_builder { + mesh_culling_data.update( + &mut mesh_culling_data_buffer, + instance_data_index as usize, + ); + } + } + } + while let Ok(batch) = removed_rx.recv().await { + for entity in batch { + remove_mesh_input_uniform( + entity, + &mut *render_mesh_instances, + current_input_buffer, + ); + } + } + while let Ok(batch) = reextract_rx.recv().await { + for entity in batch { + meshes_to_reextract_next_frame.insert(entity); + } + } + // Buffers can't be empty. Make sure there's something in the previous input buffer. + previous_input_buffer.ensure_nonempty(); + } + .instrument(info_span!("collect_meshes_consumer")), + ); // Iterate through each queue, spawning a task for each queue. This loop completes quickly // as it does very little work, it is just spawning and moving data into tasks in a loop. @@ -1894,12 +1887,10 @@ pub fn collect_meshes_for_gpu_building( frame_count, ) { Some(prepared) => { - prepared_tx - .send_blocking((entity, prepared, None)) - .unwrap(); + prepared_tx.send_blocking((entity, prepared, None)).ok(); } None => { - reextract_tx.send_blocking(entity).unwrap(); + reextract_tx.send_blocking(entity).ok(); } }, ); @@ -1931,16 +1922,11 @@ pub fn collect_meshes_for_gpu_building( frame_count, ) { Some(prepared) => { - prepared_tx - .send_blocking(( - entity, - prepared, - Some(mesh_culling_builder), - )) - .unwrap(); + let data = (entity, prepared, Some(mesh_culling_builder)); + prepared_tx.send_blocking(data).ok(); } None => { - reextract_tx.send_blocking(entity).unwrap(); + reextract_tx.send_blocking(entity).ok(); } } }, @@ -1961,13 +1947,6 @@ pub fn collect_meshes_for_gpu_building( drop(reextract_tx); drop(removed_tx); }); - - // In single threaded scenarios, we cannot spawn the worker first because it will deadlock, - // The worker would block the thread and prevent the mesh prepare tasks from ever starting and - // sending any data to be consumed. - if is_single_threaded && let Some(mut consumer_worker) = mesh_consumer_worker.take() { - consumer_worker(); - } } /// All data needed to construct a pipeline for rendering 3D meshes. From fa4f1d327328807412129482427541cdfe08c51e Mon Sep 17 00:00:00 2001 From: Aevyrie Roessler Date: Thu, 1 Jan 2026 21:14:18 -0800 Subject: [PATCH 15/21] fmt --- crates/bevy_utils/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/bevy_utils/src/lib.rs b/crates/bevy_utils/src/lib.rs index 6d6185a13e7e1..0765b10447026 100644 --- a/crates/bevy_utils/src/lib.rs +++ b/crates/bevy_utils/src/lib.rs @@ -41,7 +41,6 @@ cfg::parallel! { pub use buffered_channel::*; } - /// The utilities prelude. /// /// This includes the most common types in this crate, re-exported for your convenience. From 7b2e3cb132025dbb29d3997e80c318dc19be00d0 Mon Sep 17 00:00:00 2001 From: Aevyrie Roessler Date: Thu, 1 Jan 2026 22:37:36 -0800 Subject: [PATCH 16/21] Consolidate block_on impl to fix WASM build error --- crates/bevy_platform/Cargo.toml | 8 +++++ crates/bevy_platform/src/future.rs | 32 ++++++++++++++++++ crates/bevy_platform/src/lib.rs | 1 + crates/bevy_tasks/Cargo.toml | 5 ++- crates/bevy_tasks/src/lib.rs | 40 +---------------------- crates/bevy_utils/Cargo.toml | 6 ++-- crates/bevy_utils/src/buffered_channel.rs | 10 ++++++ crates/bevy_utils/src/lib.rs | 7 ++++ 8 files changed, 65 insertions(+), 44 deletions(-) create mode 100644 crates/bevy_platform/src/future.rs diff --git a/crates/bevy_platform/Cargo.toml b/crates/bevy_platform/Cargo.toml index e17d5a4bc0c72..9c44918ad4e49 100644 --- a/crates/bevy_platform/Cargo.toml +++ b/crates/bevy_platform/Cargo.toml @@ -21,6 +21,12 @@ rayon = ["dep:rayon", "hashbrown/rayon"] # Platform Compatibility +## Provides an implementation of `block_on` from `futures-lite`. +futures-lite = ["std", "dep:futures-lite", "futures-lite?/std"] + +## Provides an implementation of `block_on` from `async-io`. +async-io = ["std", "dep:async-io"] + ## Allows access to the `std` crate. Enabling this feature will prevent compilation ## on `no_std` targets, but provides access to certain additional features on ## supported platforms. @@ -72,6 +78,8 @@ hashbrown = { version = "0.16.1", features = [ ], optional = true, default-features = false } serde = { version = "1", default-features = false, optional = true } rayon = { version = "1", default-features = false, optional = true } +futures-lite = { version = "2.0.1", default-features = false, optional = true } +async-io = { version = "2.0.0", optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] web-time = { version = "1.1", default-features = false, optional = true } diff --git a/crates/bevy_platform/src/future.rs b/crates/bevy_platform/src/future.rs new file mode 100644 index 0000000000000..fe77c793f2d94 --- /dev/null +++ b/crates/bevy_platform/src/future.rs @@ -0,0 +1,32 @@ +//! Platform-aware future utilities. + +crate::cfg::switch! { + #[cfg(feature = "async-io")] => { + pub use async_io::block_on; + } + #[cfg(feature = "futures-lite")] => { + pub use futures_lite::future::block_on; + } + _ => { + /// Blocks on the supplied `future`. + /// This implementation will busy-wait until it is completed. + /// Consider enabling the `async-io` or `futures-lite` features. + pub fn block_on(future: impl core::future::Future) -> T { + use core::task::{Poll, Context}; + + // Pin the future on the stack. + let mut future = core::pin::pin!(future); + + // We don't care about the waker as we're just going to poll as fast as possible. + let cx = &mut Context::from_waker(core::task::Waker::noop()); + + // Keep polling until the future is ready. + loop { + match future.as_mut().poll(cx) { + Poll::Ready(output) => return output, + Poll::Pending => core::hint::spin_loop(), + } + } + } + } +} diff --git a/crates/bevy_platform/src/lib.rs b/crates/bevy_platform/src/lib.rs index 4c0b4e4621163..b4ff694f1187f 100644 --- a/crates/bevy_platform/src/lib.rs +++ b/crates/bevy_platform/src/lib.rs @@ -21,6 +21,7 @@ cfg::alloc! { pub mod cell; pub mod cfg; +pub mod future; pub mod hash; pub mod sync; pub mod thread; diff --git a/crates/bevy_tasks/Cargo.toml b/crates/bevy_tasks/Cargo.toml index 80d77fa2dcd5b..b90c827989fc5 100644 --- a/crates/bevy_tasks/Cargo.toml +++ b/crates/bevy_tasks/Cargo.toml @@ -25,11 +25,11 @@ multi_threaded = [ async_executor = ["bevy_platform/std", "dep:async-executor", "futures-lite"] # Provide an implementation of `block_on` from `futures-lite`. -futures-lite = ["bevy_platform/std", "futures-lite/std"] +futures-lite = ["bevy_platform/futures-lite"] # Use async-io's implementation of block_on instead of futures-lite's implementation. # This is preferred if your application uses async-io. -async-io = ["bevy_platform/std", "dep:async-io"] +async-io = ["bevy_platform/async-io"] [dependencies] bevy_platform = { path = "../bevy_platform", version = "0.18.0-dev", default-features = false, features = [ @@ -46,7 +46,6 @@ derive_more = { version = "2", default-features = false, features = [ ] } async-executor = { version = "1.11", optional = true } async-channel = { version = "2.3.0", optional = true } -async-io = { version = "2.0.0", optional = true } concurrent-queue = { version = "2.0.0", optional = true } atomic-waker = { version = "1", default-features = false } crossbeam-queue = { version = "0.3", default-features = false, features = [ diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 4ef156170a87e..da1a1a0538017 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -28,15 +28,6 @@ pub mod cfg { conditional_send } - #[cfg(feature = "async-io")] => { - /// Indicates `async-io` will be used for the implementation of `block_on`. - async_io - } - - #[cfg(feature = "futures-lite")] => { - /// Indicates `futures-lite` will be used for the implementation of `block_on`. - futures_lite - } } } @@ -114,36 +105,7 @@ cfg::multi_threaded! { } } -cfg::switch! { - cfg::async_io => { - pub use async_io::block_on; - } - cfg::futures_lite => { - pub use futures_lite::future::block_on; - } - _ => { - /// Blocks on the supplied `future`. - /// This implementation will busy-wait until it is completed. - /// Consider enabling the `async-io` or `futures-lite` features. - pub fn block_on(future: impl Future) -> T { - use core::task::{Poll, Context}; - - // Pin the future on the stack. - let mut future = core::pin::pin!(future); - - // We don't care about the waker as we're just going to poll as fast as possible. - let cx = &mut Context::from_waker(core::task::Waker::noop()); - - // Keep polling until the future is ready. - loop { - match future.as_mut().poll(cx) { - Poll::Ready(output) => return output, - Poll::Pending => core::hint::spin_loop(), - } - } - } - } -} +pub use bevy_platform::future::block_on; /// The tasks prelude. /// diff --git a/crates/bevy_utils/Cargo.toml b/crates/bevy_utils/Cargo.toml index 5549ff88a8054..3b6816262af76 100644 --- a/crates/bevy_utils/Cargo.toml +++ b/crates/bevy_utils/Cargo.toml @@ -9,10 +9,12 @@ license = "MIT OR Apache-2.0" keywords = ["bevy"] [features] -default = ["parallel"] +default = ["parallel", "buffered_channel"] # Provides access to the `Parallel` type. -parallel = ["bevy_platform/std", "dep:thread_local", "dep:async-channel"] +parallel = ["bevy_platform/std", "dep:thread_local"] + +buffered_channel = ["bevy_platform/std", "dep:async-channel"] std = ["disqualified/alloc"] diff --git a/crates/bevy_utils/src/buffered_channel.rs b/crates/bevy_utils/src/buffered_channel.rs index c7321ae36a08b..ea9ebf4132317 100644 --- a/crates/bevy_utils/src/buffered_channel.rs +++ b/crates/bevy_utils/src/buffered_channel.rs @@ -102,7 +102,11 @@ impl<'a, T: Send> BufferedReceiver<'a, T> { /// /// The returned [`RecycledVec`] will automatically return the buffer to the pool when dropped. pub fn recv_blocking(&self) -> Result, async_channel::RecvError> { + #[cfg(all(feature = "std", not(target_family = "wasm")))] let buffer = self.rx.recv_blocking()?; + #[cfg(any(not(feature = "std"), target_family = "wasm"))] + let buffer = bevy_platform::future::block_on(self.rx.recv())?; + Ok(RecycledVec { buffer: Some(buffer), channel: self.channel, @@ -242,7 +246,10 @@ impl<'a, T: Send> BufferedSender<'a, T> { buffer.push(msg); if buffer.len() >= self.channel.chunk_size { let full_buffer = self.buffer.take().unwrap(); + #[cfg(all(feature = "std", not(target_family = "wasm")))] self.tx.send_blocking(full_buffer)?; + #[cfg(any(not(feature = "std"), target_family = "wasm"))] + bevy_platform::future::block_on(self.tx.send(full_buffer))?; } Ok(()) } @@ -252,7 +259,10 @@ impl<'a, T: Send> BufferedSender<'a, T> { if let Some(buffer) = self.buffer.take() { if !buffer.is_empty() { // The allocation is sent through the channel and will be reused when dropped. + #[cfg(all(feature = "std", not(target_family = "wasm")))] let _ = self.tx.send_blocking(buffer); + #[cfg(any(not(feature = "std"), target_family = "wasm"))] + let _ = bevy_platform::future::block_on(self.tx.send(buffer)); } else { // If it's empty, just return it to the pool. self.channel.pool.borrow_local_mut().push(buffer); diff --git a/crates/bevy_utils/src/lib.rs b/crates/bevy_utils/src/lib.rs index 0765b10447026..43217d3eed62a 100644 --- a/crates/bevy_utils/src/lib.rs +++ b/crates/bevy_utils/src/lib.rs @@ -20,6 +20,10 @@ pub mod cfg { /// Indicates the `Parallel` type is available. parallel } + #[cfg(feature = "buffered_channel")] => { + /// Indicates the `BufferedChannel` type is available. + buffered_channel + } } } @@ -37,6 +41,9 @@ cfg::alloc! { cfg::parallel! { mod parallel_queue; pub use parallel_queue::*; +} + +cfg::buffered_channel! { mod buffered_channel; pub use buffered_channel::*; } From 716e6320582ddfbcfb5209cf1965d47db226b697 Mon Sep 17 00:00:00 2001 From: Aevyrie Roessler Date: Thu, 1 Jan 2026 22:51:49 -0800 Subject: [PATCH 17/21] Remove unnecessary qualification --- crates/bevy_platform/src/future.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_platform/src/future.rs b/crates/bevy_platform/src/future.rs index fe77c793f2d94..e733183752ae9 100644 --- a/crates/bevy_platform/src/future.rs +++ b/crates/bevy_platform/src/future.rs @@ -11,7 +11,7 @@ crate::cfg::switch! { /// Blocks on the supplied `future`. /// This implementation will busy-wait until it is completed. /// Consider enabling the `async-io` or `futures-lite` features. - pub fn block_on(future: impl core::future::Future) -> T { + pub fn block_on(future: impl Future) -> T { use core::task::{Poll, Context}; // Pin the future on the stack. From 809bc66c7a4e29f12f554f00a672390a78fa1dba Mon Sep 17 00:00:00 2001 From: Aevyrie Roessler Date: Thu, 1 Jan 2026 23:17:57 -0800 Subject: [PATCH 18/21] Fix doc link --- crates/bevy_utils/src/buffered_channel.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_utils/src/buffered_channel.rs b/crates/bevy_utils/src/buffered_channel.rs index ea9ebf4132317..fd58c0b85037d 100644 --- a/crates/bevy_utils/src/buffered_channel.rs +++ b/crates/bevy_utils/src/buffered_channel.rs @@ -7,7 +7,7 @@ use core::ops::{Deref, DerefMut}; /// /// This is a building block for efficient parallel worker tasks. /// -/// Cache this channel in a system's [`Local`] to reuse allocated memory. +/// Cache this channel in a system's `Local` to reuse allocated memory. /// /// This is faster than sending each message individually into a channel when communicating between /// tasks. Unlike `Parallel`, this allows you to execute a consuming task while producing tasks are From 69f235ef89b85c89dea63d71169b11c86b2218df Mon Sep 17 00:00:00 2001 From: Aevyrie Roessler Date: Fri, 2 Jan 2026 02:18:36 -0800 Subject: [PATCH 19/21] Fix vecs not being reused when consumed by intoiterator --- crates/bevy_pbr/src/render/mesh.rs | 12 ++++++------ crates/bevy_utils/src/buffered_channel.rs | 19 +++++++++---------- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/crates/bevy_pbr/src/render/mesh.rs b/crates/bevy_pbr/src/render/mesh.rs index d14da20dbe2f2..c39cc84107edd 100644 --- a/crates/bevy_pbr/src/render/mesh.rs +++ b/crates/bevy_pbr/src/render/mesh.rs @@ -1818,8 +1818,8 @@ pub fn collect_meshes_for_gpu_building( // prepared meshes, this worker will consume them and update the GPU buffers. scope.spawn( async move { - while let Ok(batch) = prepared_rx.recv().await { - for (entity, prepared, mesh_culling_builder) in batch { + while let Ok(mut batch) = prepared_rx.recv().await { + for (entity, prepared, mesh_culling_builder) in batch.drain() { let Some(instance_data_index) = prepared.update( entity, &mut *render_mesh_instances, @@ -1836,8 +1836,8 @@ pub fn collect_meshes_for_gpu_building( } } } - while let Ok(batch) = removed_rx.recv().await { - for entity in batch { + while let Ok(mut batch) = removed_rx.recv().await { + for entity in batch.drain() { remove_mesh_input_uniform( entity, &mut *render_mesh_instances, @@ -1845,8 +1845,8 @@ pub fn collect_meshes_for_gpu_building( ); } } - while let Ok(batch) = reextract_rx.recv().await { - for entity in batch { + while let Ok(mut batch) = reextract_rx.recv().await { + for entity in batch.drain() { meshes_to_reextract_next_frame.insert(entity); } } diff --git a/crates/bevy_utils/src/buffered_channel.rs b/crates/bevy_utils/src/buffered_channel.rs index fd58c0b85037d..e717380b7ddfa 100644 --- a/crates/bevy_utils/src/buffered_channel.rs +++ b/crates/bevy_utils/src/buffered_channel.rs @@ -36,7 +36,7 @@ use core::ops::{Deref, DerefMut}; /// scope.spawn(async move { /// let mut total = 0; /// let mut count = 0; -/// while let Ok(chunk) = rx.recv().await { +/// while let Ok(mut chunk) = rx.recv().await { /// count += chunk.len(); /// total += chunk.iter().sum::(); /// } @@ -121,6 +121,14 @@ pub struct RecycledVec<'a, T: Send> { channel: &'a BufferedChannel, } +impl<'a, T: Send> RecycledVec<'a, T> { + /// Drains the elements from the buffer as an iterator, keeping the allocation + /// so it can be recycled when this [`RecycledVec`] is dropped. + pub fn drain(&mut self) -> alloc::vec::Drain<'_, T> { + self.buffer.as_mut().unwrap().drain(..) + } +} + impl<'a, T: Send> Deref for RecycledVec<'a, T> { type Target = [T]; fn deref(&self) -> &Self::Target { @@ -134,15 +142,6 @@ impl<'a, T: Send> DerefMut for RecycledVec<'a, T> { } } -impl<'a, T: Send> IntoIterator for RecycledVec<'a, T> { - type Item = T; - type IntoIter = alloc::vec::IntoIter; - - fn into_iter(mut self) -> Self::IntoIter { - self.buffer.take().unwrap().into_iter() - } -} - impl<'a, 'b, T: Send> IntoIterator for &'b RecycledVec<'a, T> { type Item = &'b T; type IntoIter = core::slice::Iter<'b, T>; From 9e4f42f42dedd5ccba7e2033fcb28c037cf96269 Mon Sep 17 00:00:00 2001 From: Aevyrie Roessler Date: Fri, 16 Jan 2026 18:25:16 -0800 Subject: [PATCH 20/21] Fix bevy_tasks dep missing --- crates/bevy_pbr/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_pbr/Cargo.toml b/crates/bevy_pbr/Cargo.toml index 42f615159a480..1666c03c1043b 100644 --- a/crates/bevy_pbr/Cargo.toml +++ b/crates/bevy_pbr/Cargo.toml @@ -56,7 +56,7 @@ bevy_render = { path = "../bevy_render", version = "0.19.0-dev", features = [ "morph", ] } bevy_camera = { path = "../bevy_camera", version = "0.19.0-dev" } -bevy_tasks = { path = "../bevy_tasks", version = "0.19.0-dev", optional = true } +bevy_tasks = { path = "../bevy_tasks", version = "0.19.0-dev" } bevy_transform = { path = "../bevy_transform", version = "0.19.0-dev" } bevy_utils = { path = "../bevy_utils", version = "0.19.0-dev" } bevy_platform = { path = "../bevy_platform", version = "0.19.0-dev", default-features = false, features = [ From 2cc67d23c9f2b098fd33a3c3bb0558b8aba1c566 Mon Sep 17 00:00:00 2001 From: Aevyrie Roessler Date: Fri, 16 Jan 2026 22:54:17 -0800 Subject: [PATCH 21/21] chore: Trigger CI/CD pipeline