Skip to content

Commit d7b2e99

Browse files
committed
Wrap the AssetSources in a RwLock.
1 parent 6463391 commit d7b2e99

File tree

3 files changed

+53
-24
lines changed

3 files changed

+53
-24
lines changed

crates/bevy_asset/src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ use bevy_ecs::{
225225
use bevy_platform::collections::HashSet;
226226
use bevy_reflect::{FromReflect, GetTypeRegistration, Reflect, TypePath};
227227
use core::any::TypeId;
228+
use std::sync::RwLock;
228229
use tracing::error;
229230

230231
/// Provides "asset" loading and processing functionality. An [`Asset`] is a "runtime value" that is loaded from an [`AssetSource`],
@@ -375,7 +376,7 @@ impl Plugin for AssetPlugin {
375376
let sources = builders.build_sources(watch, false, None);
376377

377378
app.insert_resource(AssetServer::new_with_meta_check(
378-
Arc::new(sources),
379+
Arc::new(RwLock::new(sources)),
379380
AssetServerMode::Unprocessed,
380381
self.meta_check.clone(),
381382
watch,
@@ -404,7 +405,7 @@ impl Plugin for AssetPlugin {
404405
let mut builders = app.world_mut().resource_mut::<AssetSourceBuilders>();
405406
let sources = builders.build_sources(false, watch, None);
406407
app.insert_resource(AssetServer::new_with_meta_check(
407-
Arc::new(sources),
408+
Arc::new(RwLock::new(sources)),
408409
AssetServerMode::Processed,
409410
AssetMetaCheck::Always,
410411
watch,

crates/bevy_asset/src/processor/mod.rs

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ pub struct AssetProcessorData {
115115
processors: RwLock<HashMap<&'static str, Arc<dyn ErasedProcessor>>>,
116116
/// Default processors for file extensions
117117
default_processors: RwLock<HashMap<Box<str>, &'static str>>,
118-
sources: Arc<AssetSources>,
118+
/// The asset sources for which this processor will process assets.
119+
sources: Arc<RwLock<AssetSources>>,
119120
}
120121

121122
/// The current state of processing, including the overall state and the state of all assets.
@@ -137,10 +138,10 @@ impl AssetProcessor {
137138
pub fn new(
138139
sources: &mut AssetSourceBuilders,
139140
watch_processed: bool,
140-
) -> (Self, Arc<AssetSources>) {
141+
) -> (Self, Arc<RwLock<AssetSources>>) {
141142
let state = Arc::new(ProcessingState::new());
142143
let sources = sources.build_sources(true, watch_processed, Some(state.clone()));
143-
let sources = Arc::new(sources);
144+
let sources = Arc::new(RwLock::new(sources));
144145

145146
let data = Arc::new(AssetProcessorData::new(sources.clone(), state));
146147
// The asset processor uses its own asset server with its own id space
@@ -176,11 +177,15 @@ impl AssetProcessor {
176177
&self,
177178
id: impl Into<AssetSourceId<'a>>,
178179
) -> Result<Arc<AssetSource>, MissingAssetSourceError> {
179-
self.data.sources.get(id.into())
180+
self.data
181+
.sources
182+
.read()
183+
.unwrap_or_else(PoisonError::into_inner)
184+
.get(id.into())
180185
}
181186

182187
#[inline]
183-
pub fn sources(&self) -> &AssetSources {
188+
pub fn sources(&self) -> &RwLock<AssetSources> {
184189
&self.data.sources
185190
}
186191

@@ -233,11 +238,19 @@ impl AssetProcessor {
233238
let start_time = std::time::Instant::now();
234239
debug!("Processing Assets");
235240

236-
processor.initialize().await.unwrap();
241+
let sources = processor
242+
.sources()
243+
.read()
244+
.unwrap_or_else(PoisonError::into_inner)
245+
.iter_processed()
246+
.cloned()
247+
.collect::<Vec<_>>();
248+
249+
processor.initialize(&sources).await.unwrap();
237250

238251
let (new_task_sender, new_task_receiver) = async_channel::unbounded();
239252
processor
240-
.queue_initial_processing_tasks(&new_task_sender)
253+
.queue_initial_processing_tasks(&sources, &new_task_sender)
241254
.await;
242255

243256
// Once all the tasks are queued for the initial processing, start actually
@@ -260,17 +273,18 @@ impl AssetProcessor {
260273
debug!("Processing finished in {:?}", end_time - start_time);
261274

262275
debug!("Listening for changes to source assets");
263-
processor.spawn_source_change_event_listeners(&new_task_sender);
276+
processor.spawn_source_change_event_listeners(&sources, &new_task_sender);
264277
})
265278
.detach();
266279
}
267280

268281
/// Sends start task events for all assets in all processed sources into `sender`.
269282
async fn queue_initial_processing_tasks(
270283
&self,
284+
sources: &[Arc<AssetSource>],
271285
sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
272286
) {
273-
for source in self.sources().iter_processed() {
287+
for source in sources {
274288
self.queue_processing_tasks_for_folder(source, PathBuf::from(""), sender)
275289
.await
276290
.unwrap();
@@ -281,9 +295,10 @@ impl AssetProcessor {
281295
/// response.
282296
fn spawn_source_change_event_listeners(
283297
&self,
298+
sources: &[Arc<AssetSource>],
284299
sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
285300
) {
286-
for source in self.data.sources.iter_processed() {
301+
for source in sources {
287302
let Some(receiver) = source.event_receiver().cloned() else {
288303
continue;
289304
};
@@ -753,8 +768,8 @@ impl AssetProcessor {
753768
/// This info will later be used to determine whether or not to re-process an asset
754769
///
755770
/// This will validate transactions and recover failed transactions when necessary.
756-
async fn initialize(&self) -> Result<(), InitializeError> {
757-
self.validate_transaction_log_and_recover().await;
771+
async fn initialize(&self, sources: &[Arc<AssetSource>]) -> Result<(), InitializeError> {
772+
self.validate_transaction_log_and_recover(sources).await;
758773
let mut asset_infos = self.data.processing_state.asset_infos.write().await;
759774

760775
/// Retrieves asset paths recursively. If `clean_empty_folders_writer` is Some, it will be used to clean up empty
@@ -793,7 +808,7 @@ impl AssetProcessor {
793808
}
794809
}
795810

796-
for source in self.sources().iter_processed() {
811+
for source in sources {
797812
let Some(processed_reader) = source.ungated_processed_reader() else {
798813
continue;
799814
};
@@ -1129,7 +1144,7 @@ impl AssetProcessor {
11291144
Ok(ProcessResult::Processed(new_processed_info))
11301145
}
11311146

1132-
async fn validate_transaction_log_and_recover(&self) {
1147+
async fn validate_transaction_log_and_recover(&self, sources: &[Arc<AssetSource>]) {
11331148
let log_factory = self
11341149
.data
11351150
.log_factory
@@ -1203,7 +1218,7 @@ impl AssetProcessor {
12031218

12041219
if !state_is_valid {
12051220
error!("Processed asset transaction log state was invalid and unrecoverable for some reason (see previous logs). Removing processed assets and starting fresh.");
1206-
for source in self.sources().iter_processed() {
1221+
for source in sources {
12071222
let Ok(processed_writer) = source.processed_writer() else {
12081223
continue;
12091224
};
@@ -1226,7 +1241,10 @@ impl AssetProcessor {
12261241

12271242
impl AssetProcessorData {
12281243
/// Initializes a new [`AssetProcessorData`] using the given [`AssetSources`].
1229-
pub(crate) fn new(sources: Arc<AssetSources>, processing_state: Arc<ProcessingState>) -> Self {
1244+
pub(crate) fn new(
1245+
sources: Arc<RwLock<AssetSources>>,
1246+
processing_state: Arc<ProcessingState>,
1247+
) -> Self {
12301248
AssetProcessorData {
12311249
processing_state,
12321250
sources,

crates/bevy_asset/src/server/mod.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ pub(crate) struct AssetServerData {
6969
pub(crate) loaders: Arc<RwLock<AssetLoaders>>,
7070
asset_event_sender: Sender<InternalAssetEvent>,
7171
asset_event_receiver: Receiver<InternalAssetEvent>,
72-
sources: Arc<AssetSources>,
72+
sources: Arc<RwLock<AssetSources>>,
7373
mode: AssetServerMode,
7474
meta_check: AssetMetaCheck,
7575
unapproved_path_mode: UnapprovedPathMode,
@@ -91,7 +91,7 @@ impl AssetServer {
9191
/// Create a new instance of [`AssetServer`]. If `watch_for_changes` is true, the [`AssetReader`](crate::io::AssetReader) storage will watch for changes to
9292
/// asset sources and hot-reload them.
9393
pub fn new(
94-
sources: Arc<AssetSources>,
94+
sources: Arc<RwLock<AssetSources>>,
9595
mode: AssetServerMode,
9696
watching_for_changes: bool,
9797
unapproved_path_mode: UnapprovedPathMode,
@@ -109,7 +109,7 @@ impl AssetServer {
109109
/// Create a new instance of [`AssetServer`]. If `watch_for_changes` is true, the [`AssetReader`](crate::io::AssetReader) storage will watch for changes to
110110
/// asset sources and hot-reload them.
111111
pub fn new_with_meta_check(
112-
sources: Arc<AssetSources>,
112+
sources: Arc<RwLock<AssetSources>>,
113113
mode: AssetServerMode,
114114
meta_check: AssetMetaCheck,
115115
watching_for_changes: bool,
@@ -126,7 +126,7 @@ impl AssetServer {
126126
}
127127

128128
pub(crate) fn new_with_loaders(
129-
sources: Arc<AssetSources>,
129+
sources: Arc<RwLock<AssetSources>>,
130130
loaders: Arc<RwLock<AssetLoaders>>,
131131
mode: AssetServerMode,
132132
meta_check: AssetMetaCheck,
@@ -183,7 +183,11 @@ impl AssetServer {
183183
&self,
184184
source: impl Into<AssetSourceId<'a>>,
185185
) -> Result<Arc<AssetSource>, MissingAssetSourceError> {
186-
self.data.sources.get(source.into())
186+
self.data
187+
.sources
188+
.read()
189+
.unwrap_or_else(PoisonError::into_inner)
190+
.get(source.into())
187191
}
188192

189193
/// Returns true if the [`AssetServer`] watches for changes.
@@ -1849,7 +1853,13 @@ pub fn handle_internal_asset_events(world: &mut World) {
18491853
}
18501854
};
18511855

1852-
for source in server.data.sources.iter() {
1856+
for source in server
1857+
.data
1858+
.sources
1859+
.read()
1860+
.unwrap_or_else(PoisonError::into_inner)
1861+
.iter()
1862+
{
18531863
match server.data.mode {
18541864
AssetServerMode::Unprocessed => {
18551865
if let Some(receiver) = source.event_receiver() {

0 commit comments

Comments
 (0)