Skip to content

Commit ec127c4

Browse files
authored
Share AssetSources between the AssetServer, the asset processor, and the asset processor's internal asset server. (#21763)
# Objective - Previously, we would build the sources up to 3 times with different options: once for the regular asset server, once for the asset processor, and once for the asset processor's internal asset server. - This is a step towards #21758 (since now adding a source to this shared `AssetSources` will be reflected in all the uses. ## Solution 1) Skip all the hot-reloading polling if `watch_for_changes` is false. If we don't do this, sharing the sources between the asset server and the processor-internal asset server will result in two tasks consuming asset events, so the regular asset server will miss asset events. 2) Move the state of the asset processor into a separate struct that we then Arc. We need to be able to gate the processed asset reader, but we can't create an asset processor without the sources. So instead we allow ourselves to create the state first, so that we can gate on that state, and then create the processor with that state. 3) Split the processed reader into a gated and an ungated form. The gate first blocks on the processor being done initialized, followed by gating on the per-asset lock. However the asset processor needs to iterate through all the directories in order to finish initializing. So just unconditionally gating doesn't work - we hit a deadlock. So we provide access to the ungated processed reader, so that the processor can use that to initialize its state. 4) Finally do the sharing! One thing I'm starting in this PR is making things more private. For example, it's not clear why `ProcessorGatedReader` was `pub`. I've also made `AssetSources::gate_on_processor` no longer `pub`. This does mean that users can no longer initialize their own `AssetServer` properly (e.g., gated on the processor), but I don't think we should really support this - our focus should be on the `AssetServer` initialized by `AssetPlugin`, not hypothetical uses where someone wants to insert their own `AssetServer`. ## Testing - CI
1 parent 4809a3c commit ec127c4

File tree

6 files changed

+243
-94
lines changed

6 files changed

+243
-94
lines changed

crates/bevy_asset/src/io/processor_gated.rs

Lines changed: 20 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
22
io::{AssetReader, AssetReaderError, AssetSourceId, PathStream, Reader},
3-
processor::{AssetProcessorData, ProcessStatus},
3+
processor::{ProcessStatus, ProcessingState},
44
AssetPath,
55
};
66
use alloc::{borrow::ToOwned, boxed::Box, sync::Arc, vec::Vec};
@@ -16,46 +16,33 @@ use super::{AsyncSeekForward, ErasedAssetReader};
1616
/// given path until that path has been processed by [`AssetProcessor`].
1717
///
1818
/// [`AssetProcessor`]: crate::processor::AssetProcessor
19-
pub struct ProcessorGatedReader {
20-
reader: Box<dyn ErasedAssetReader>,
19+
pub(crate) struct ProcessorGatedReader {
20+
reader: Arc<dyn ErasedAssetReader>,
2121
source: AssetSourceId<'static>,
22-
processor_data: Arc<AssetProcessorData>,
22+
processing_state: Arc<ProcessingState>,
2323
}
2424

2525
impl ProcessorGatedReader {
2626
/// Creates a new [`ProcessorGatedReader`].
27-
pub fn new(
27+
pub(crate) fn new(
2828
source: AssetSourceId<'static>,
29-
reader: Box<dyn ErasedAssetReader>,
30-
processor_data: Arc<AssetProcessorData>,
29+
reader: Arc<dyn ErasedAssetReader>,
30+
processing_state: Arc<ProcessingState>,
3131
) -> Self {
3232
Self {
3333
source,
34-
processor_data,
3534
reader,
35+
processing_state,
3636
}
3737
}
38-
39-
/// Gets a "transaction lock" that can be used to ensure no writes to asset or asset meta occur
40-
/// while it is held.
41-
async fn get_transaction_lock(
42-
&self,
43-
path: &AssetPath<'static>,
44-
) -> Result<RwLockReadGuardArc<()>, AssetReaderError> {
45-
let infos = self.processor_data.asset_infos.read().await;
46-
let info = infos
47-
.get(path)
48-
.ok_or_else(|| AssetReaderError::NotFound(path.path().to_owned()))?;
49-
Ok(info.file_transaction_lock.read_arc().await)
50-
}
5138
}
5239

5340
impl AssetReader for ProcessorGatedReader {
5441
async fn read<'a>(&'a self, path: &'a Path) -> Result<impl Reader + 'a, AssetReaderError> {
5542
let asset_path = AssetPath::from(path.to_path_buf()).with_source(self.source.clone());
5643
trace!("Waiting for processing to finish before reading {asset_path}");
5744
let process_result = self
58-
.processor_data
45+
.processing_state
5946
.wait_until_processed(asset_path.clone())
6047
.await;
6148
match process_result {
@@ -65,7 +52,10 @@ impl AssetReader for ProcessorGatedReader {
6552
}
6653
}
6754
trace!("Processing finished with {asset_path}, reading {process_result:?}",);
68-
let lock = self.get_transaction_lock(&asset_path).await?;
55+
let lock = self
56+
.processing_state
57+
.get_transaction_lock(&asset_path)
58+
.await?;
6959
let asset_reader = self.reader.read(path).await?;
7060
let reader = TransactionLockedReader::new(asset_reader, lock);
7161
Ok(reader)
@@ -75,7 +65,7 @@ impl AssetReader for ProcessorGatedReader {
7565
let asset_path = AssetPath::from(path.to_path_buf()).with_source(self.source.clone());
7666
trace!("Waiting for processing to finish before reading meta for {asset_path}",);
7767
let process_result = self
78-
.processor_data
68+
.processing_state
7969
.wait_until_processed(asset_path.clone())
8070
.await;
8171
match process_result {
@@ -85,7 +75,10 @@ impl AssetReader for ProcessorGatedReader {
8575
}
8676
}
8777
trace!("Processing finished with {process_result:?}, reading meta for {asset_path}",);
88-
let lock = self.get_transaction_lock(&asset_path).await?;
78+
let lock = self
79+
.processing_state
80+
.get_transaction_lock(&asset_path)
81+
.await?;
8982
let meta_reader = self.reader.read_meta(path).await?;
9083
let reader = TransactionLockedReader::new(meta_reader, lock);
9184
Ok(reader)
@@ -99,7 +92,7 @@ impl AssetReader for ProcessorGatedReader {
9992
"Waiting for processing to finish before reading directory {:?}",
10093
path
10194
);
102-
self.processor_data.wait_until_finished().await;
95+
self.processing_state.wait_until_finished().await;
10396
trace!("Processing finished, reading directory {:?}", path);
10497
let result = self.reader.read_directory(path).await?;
10598
Ok(result)
@@ -110,7 +103,7 @@ impl AssetReader for ProcessorGatedReader {
110103
"Waiting for processing to finish before reading directory {:?}",
111104
path
112105
);
113-
self.processor_data.wait_until_finished().await;
106+
self.processing_state.wait_until_finished().await;
114107
trace!("Processing finished, getting directory status {:?}", path);
115108
let result = self.reader.is_directory(path).await?;
116109
Ok(result)

crates/bevy_asset/src/io/source.rs

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
22
io::{processor_gated::ProcessorGatedReader, AssetSourceEvent, AssetWatcher},
3-
processor::AssetProcessorData,
3+
processor::ProcessingState,
44
};
55
use alloc::{
66
boxed::Box,
@@ -180,7 +180,12 @@ impl AssetSourceBuilder {
180180
id: id.clone(),
181181
reader,
182182
writer,
183-
processed_reader: self.processed_reader.as_mut().map(|r| r()),
183+
processed_reader: self
184+
.processed_reader
185+
.as_mut()
186+
.map(|r| r())
187+
.map(Into::<Arc<_>>::into),
188+
ungated_processed_reader: None,
184189
processed_writer,
185190
event_receiver: None,
186191
watcher: None,
@@ -386,7 +391,12 @@ pub struct AssetSource {
386391
id: AssetSourceId<'static>,
387392
reader: Box<dyn ErasedAssetReader>,
388393
writer: Option<Box<dyn ErasedAssetWriter>>,
389-
processed_reader: Option<Box<dyn ErasedAssetReader>>,
394+
processed_reader: Option<Arc<dyn ErasedAssetReader>>,
395+
/// The ungated version of `processed_reader`.
396+
///
397+
/// This allows the processor to read all the processed assets to initialize itself without
398+
/// being gated on itself (causing a deadlock).
399+
ungated_processed_reader: Option<Arc<dyn ErasedAssetReader>>,
390400
processed_writer: Option<Box<dyn ErasedAssetWriter>>,
391401
watcher: Option<Box<dyn AssetWatcher>>,
392402
processed_watcher: Option<Box<dyn AssetWatcher>>,
@@ -425,6 +435,13 @@ impl AssetSource {
425435
.ok_or_else(|| MissingProcessedAssetReaderError(self.id.clone_owned()))
426436
}
427437

438+
/// Return's this source's ungated processed [`AssetReader`](crate::io::AssetReader), if it
439+
/// exists.
440+
#[inline]
441+
pub(crate) fn ungated_processed_reader(&self) -> Option<&dyn ErasedAssetReader> {
442+
self.ungated_processed_reader.as_deref()
443+
}
444+
428445
/// Return's this source's processed [`AssetWriter`](crate::io::AssetWriter), if it exists.
429446
#[inline]
430447
pub fn processed_writer(
@@ -560,12 +577,13 @@ impl AssetSource {
560577

561578
/// This will cause processed [`AssetReader`](crate::io::AssetReader) futures (such as [`AssetReader::read`](crate::io::AssetReader::read)) to wait until
562579
/// the [`AssetProcessor`](crate::AssetProcessor) has finished processing the requested asset.
563-
pub fn gate_on_processor(&mut self, processor_data: Arc<AssetProcessorData>) {
580+
pub(crate) fn gate_on_processor(&mut self, processing_state: Arc<ProcessingState>) {
564581
if let Some(reader) = self.processed_reader.take() {
565-
self.processed_reader = Some(Box::new(ProcessorGatedReader::new(
582+
self.ungated_processed_reader = Some(reader.clone());
583+
self.processed_reader = Some(Arc::new(ProcessorGatedReader::new(
566584
self.id(),
567585
reader,
568-
processor_data,
586+
processing_state,
569587
)));
570588
}
571589
}
@@ -622,9 +640,9 @@ impl AssetSources {
622640

623641
/// This will cause processed [`AssetReader`](crate::io::AssetReader) futures (such as [`AssetReader::read`](crate::io::AssetReader::read)) to wait until
624642
/// the [`AssetProcessor`](crate::AssetProcessor) has finished processing the requested asset.
625-
pub fn gate_on_processor(&mut self, processor_data: Arc<AssetProcessorData>) {
643+
pub(crate) fn gate_on_processor(&mut self, processing_state: Arc<ProcessingState>) {
626644
for source in self.iter_processed_mut() {
627-
source.gate_on_processor(processor_data.clone());
645+
source.gate_on_processor(processing_state.clone());
628646
}
629647
}
630648
}

crates/bevy_asset/src/lib.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,7 @@ impl Plugin for AssetPlugin {
375375
let sources = builders.build_sources(watch, false);
376376

377377
app.insert_resource(AssetServer::new_with_meta_check(
378-
sources,
378+
Arc::new(sources),
379379
AssetServerMode::Unprocessed,
380380
self.meta_check.clone(),
381381
watch,
@@ -388,9 +388,7 @@ impl Plugin for AssetPlugin {
388388
.unwrap_or(cfg!(feature = "asset_processor"));
389389
if use_asset_processor {
390390
let mut builders = app.world_mut().resource_mut::<AssetSourceBuilders>();
391-
let processor = AssetProcessor::new(&mut builders);
392-
let mut sources = builders.build_sources(false, watch);
393-
sources.gate_on_processor(processor.data.clone());
391+
let (processor, sources) = AssetProcessor::new(&mut builders, watch);
394392
// the main asset server shares loaders with the processor asset server
395393
app.insert_resource(AssetServer::new_with_loaders(
396394
sources,
@@ -406,7 +404,7 @@ impl Plugin for AssetPlugin {
406404
let mut builders = app.world_mut().resource_mut::<AssetSourceBuilders>();
407405
let sources = builders.build_sources(false, watch);
408406
app.insert_resource(AssetServer::new_with_meta_check(
409-
sources,
407+
Arc::new(sources),
410408
AssetServerMode::Processed,
411409
AssetMetaCheck::Always,
412410
watch,

0 commit comments

Comments
 (0)