diff --git a/crates/bevy_asset/src/meta.rs b/crates/bevy_asset/src/meta.rs index 0e972261198cc..eac7ec8585f7b 100644 --- a/crates/bevy_asset/src/meta.rs +++ b/crates/bevy_asset/src/meta.rs @@ -67,6 +67,9 @@ pub enum AssetAction { processor: String, settings: ProcessSettings, }, + /// This asset has been decomposed into multiple files. The original asset path can no longer be + /// loaded. + Decomposed, /// Do nothing with the asset Ignore, } @@ -110,6 +113,7 @@ pub struct AssetMetaMinimal { pub enum AssetActionMinimal { Load { loader: String }, Process { processor: String }, + Decomposed, Ignore, } @@ -177,13 +181,12 @@ impl_downcast!(Settings); /// The () processor should never be called. This implementation exists to make the meta format nicer to work with. impl Process for () { type Settings = (); - type OutputLoader = (); async fn process( &self, _context: &mut bevy_asset::processor::ProcessContext<'_>, _meta: AssetMeta<(), Self>, - _writer: &mut bevy_asset::io::Writer, + _writer_context: bevy_asset::processor::WriterContext<'_>, ) -> Result<(), bevy_asset::processor::ProcessError> { unreachable!() } diff --git a/crates/bevy_asset/src/processor/mod.rs b/crates/bevy_asset/src/processor/mod.rs index 516f83448be1f..c0f8d17a940bf 100644 --- a/crates/bevy_asset/src/processor/mod.rs +++ b/crates/bevy_asset/src/processor/mod.rs @@ -63,8 +63,9 @@ use bevy_platform::{ sync::{PoisonError, RwLock}, }; use bevy_tasks::IoTaskPool; +use core::sync::atomic::AtomicU32; use futures_io::ErrorKind; -use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt}; +use futures_lite::{AsyncReadExt, StreamExt}; use futures_util::{select_biased, FutureExt}; use std::{ path::{Path, PathBuf}, @@ -756,15 +757,31 @@ impl AssetProcessor { self.validate_transaction_log_and_recover().await; let mut asset_infos = self.data.processing_state.asset_infos.write().await; - /// Retrieves asset paths recursively. If `clean_empty_folders_writer` is Some, it will be used to clean up empty - /// folders when they are discovered. + /// Retrieves asset paths recursively. If `empty_dirs` is Some, it will be used to clean up + /// empty folders when they are discovered. If `check_directory_meta` is true, directories + /// with a meta file next their file path will be treated as an asset path (instead of its + /// contents). async fn get_asset_paths( reader: &dyn ErasedAssetReader, path: PathBuf, + check_directory_meta: bool, paths: &mut Vec, mut empty_dirs: Option<&mut Vec>, ) -> Result { if reader.is_directory(&path).await? { + if check_directory_meta + && match reader.read_meta(&path).await { + Ok(_) => true, + Err(AssetReaderError::NotFound(_)) => false, + Err(err) => return Err(err), + } + { + // If this directory has a meta file, then it is likely a processed asset using + // `ProcessContext::write_partial`, so count the whole thing as an asset path. + paths.push(path); + return Ok(true); + } + let mut path_stream = reader.read_directory(&path).await?; let mut contains_files = false; @@ -772,6 +789,7 @@ impl AssetProcessor { contains_files |= Box::pin(get_asset_paths( reader, child_path, + check_directory_meta, paths, empty_dirs.as_deref_mut(), )) @@ -803,6 +821,7 @@ impl AssetProcessor { get_asset_paths( source.reader(), PathBuf::from(""), + /*check_directory_meta=*/ false, &mut unprocessed_paths, None, ) @@ -814,6 +833,7 @@ impl AssetProcessor { get_asset_paths( processed_reader, PathBuf::from(""), + /*check_directory_meta=*/ true, &mut processed_paths, Some(&mut empty_dirs), ) @@ -826,6 +846,10 @@ impl AssetProcessor { for empty_dir in empty_dirs { // We don't care if this succeeds, since it's just a cleanup task. It is best-effort let _ = processed_writer.remove_empty_directory(&empty_dir).await; + // The directory may also have been an asset that was processed - try to delete its + // meta. If it fails, that either means there was no meta (which is fine), or the + // delete itself failed, which is also fine like above. + let _ = processed_writer.remove_meta(&empty_dir).await; } for path in unprocessed_paths { @@ -835,43 +859,42 @@ impl AssetProcessor { for path in processed_paths { let mut dependencies = Vec::new(); let asset_path = AssetPath::from(path).with_source(source.id()); - if let Some(info) = asset_infos.get_mut(&asset_path) { - match processed_reader.read_meta_bytes(asset_path.path()).await { - Ok(meta_bytes) => { - match ron::de::from_bytes::(&meta_bytes) { - Ok(minimal) => { - trace!( - "Populated processed info for asset {asset_path} {:?}", - minimal.processed_info - ); - - if let Some(processed_info) = &minimal.processed_info { - for process_dependency_info in - &processed_info.process_dependencies - { - dependencies.push(process_dependency_info.path.clone()); - } - } - info.processed_info = minimal.processed_info; - } - Err(err) => { - trace!("Removing processed data for {asset_path} because meta could not be parsed: {err}"); - self.remove_processed_asset_and_meta(source, asset_path.path()) - .await; - } - } - } - Err(err) => { - trace!("Removing processed data for {asset_path} because meta failed to load: {err}"); - self.remove_processed_asset_and_meta(source, asset_path.path()) - .await; - } - } - } else { + let Some(info) = asset_infos.get_mut(&asset_path) else { trace!("Removing processed data for non-existent asset {asset_path}"); self.remove_processed_asset_and_meta(source, asset_path.path()) .await; + continue; + }; + let meta_bytes = match processed_reader.read_meta_bytes(asset_path.path()).await { + Ok(meta_bytes) => meta_bytes, + Err(err) => { + trace!("Removing processed data for {asset_path} because meta failed to load: {err}"); + self.remove_processed_asset_and_meta(source, asset_path.path()) + .await; + continue; + } + }; + let minimal = match ron::de::from_bytes::(&meta_bytes) { + Ok(minimal) => minimal, + Err(err) => { + trace!("Removing processed data for {asset_path} because meta could not be parsed: {err}"); + self.remove_processed_asset_and_meta(source, asset_path.path()) + .await; + continue; + } + }; + + trace!( + "Populated processed info for asset {asset_path} {:?}", + minimal.processed_info + ); + + if let Some(processed_info) = &minimal.processed_info { + for process_dependency_info in &processed_info.process_dependencies { + dependencies.push(process_dependency_info.path.clone()); + } } + info.processed_info = minimal.processed_info; for dependency in dependencies { asset_infos.add_dependent(&dependency, asset_path.clone()); @@ -890,11 +913,30 @@ impl AssetProcessor { /// Removes the processed version of an asset and its metadata, if it exists. This _is not_ transactional like `remove_processed_asset_transactional`, nor /// does it remove existing in-memory metadata. async fn remove_processed_asset_and_meta(&self, source: &AssetSource, path: &Path) { - if let Err(err) = source.processed_writer().unwrap().remove(path).await { - warn!("Failed to remove non-existent asset {path:?}: {err}"); + let reader = source.ungated_processed_reader().unwrap(); + let writer = source.processed_writer().unwrap(); + + // Even if we fail to delete the asset, we may still succeed at deleting its meta and + // ancestors. + 'delete_asset: { + let is_directory = match reader.is_directory(path).await { + Ok(is_directory) => is_directory, + Err(err) => { + warn!("Failed to determine whether asset {path:?} was processed into a directory or an asset: {err}"); + break 'delete_asset; + } + }; + let asset_remove_result = if is_directory { + writer.remove_directory(path).await + } else { + writer.remove(path).await + }; + if let Err(err) = asset_remove_result { + warn!("Failed to remove non-existent asset {path:?}: {err}"); + } } - if let Err(err) = source.processed_writer().unwrap().remove_meta(path).await { + if let Err(err) = writer.remove_meta(path).await { warn!("Failed to remove non-existent meta {path:?}: {err}"); } @@ -902,6 +944,48 @@ impl AssetProcessor { .await; } + /// Removes the processed version of an asset, if it exists. + /// + /// This does not delete its meta file, or any parent directories. This intends for the asset to + /// be overwritten afterwards. + async fn remove_processed_asset_for_overwrite( + &self, + source: &AssetSource, + path: &Path, + ) -> Result<(), ProcessError> { + let reader = source.ungated_processed_reader().unwrap(); + let writer = source.processed_writer().unwrap(); + + let make_path = || { + AssetPath::from_path(path) + .with_source(source.id()) + .into_owned() + }; + let is_directory = match reader.is_directory(path).await { + Ok(is_directory) => is_directory, + // Ignore NotFound errors, since all we care about is that the processed asset isn't + // there anymore. + Err(AssetReaderError::NotFound(_)) => return Ok(()), + Err(err) => { + return Err(ProcessError::AssetReaderError { + path: make_path(), + err, + }); + } + }; + let err = if is_directory { + writer.remove_directory(path).await + } else { + writer.remove(path).await + }; + // The is_directory call succeeded, so we should have something to delete, but it's possible + // the file gets deleted before we get to it here, so be lenient with the error. + if let Err(err) = err { + warn!("Failed to remove existing processed asset: {err}"); + } + Ok(()) + } + async fn clean_empty_processed_ancestor_folders(&self, source: &AssetSource, path: &Path) { // As a safety precaution don't delete absolute paths to avoid deleting folders outside of the destination folder if path.is_absolute() { @@ -987,7 +1071,7 @@ impl AssetProcessor { let meta = processor.deserialize_meta(&meta_bytes)?; (meta, Some(processor)) } - AssetActionMinimal::Ignore => { + AssetActionMinimal::Ignore | AssetActionMinimal::Decomposed => { return Ok(ProcessResult::Ignored); } }; @@ -1078,24 +1162,40 @@ impl AssetProcessor { // Directly writing to the asset destination in the processor necessitates this behavior // TODO: this class of failure can be recovered via re-processing + smarter log validation that allows for duplicate transactions in the event of failures self.log_begin_processing(asset_path).await; - if let Some(processor) = processor { - let mut writer = processed_writer.write(path).await.map_err(writer_err)?; - let mut processed_meta = { - let mut context = - ProcessContext::new(self, asset_path, &asset_bytes, &mut new_processed_info); - processor - .process(&mut context, source_meta, &mut *writer) - .await? - }; - - writer - .flush() - .await - .map_err(|e| ProcessError::AssetWriterError { - path: asset_path.clone(), - err: AssetWriterError::Io(e), - })?; + // First try to delete the asset if it already exists, so we don't fail to write the asset + // or merge the assets somehow. + self.remove_processed_asset_for_overwrite(source, asset_path.path()) + .await?; + if let Some(processor) = processor { + let mut started_writes = 0; + let mut finished_writes = AtomicU32::new(0); + let mut full_meta = None; + processor + .process( + &mut ProcessContext::new( + self, + asset_path, + &asset_bytes, + &mut new_processed_info, + ), + source_meta, + WriterContext::new( + processed_writer, + &mut started_writes, + &mut finished_writes, + &mut full_meta, + asset_path, + ), + ) + .await?; + + if started_writes == 0 { + return Err(InvalidProcessOutput::NoWriter.into()); + } + if started_writes != finished_writes.into_inner() { + return Err(InvalidProcessOutput::UnfinishedWriter.into()); + } let full_hash = get_full_asset_hash( new_hash, new_processed_info @@ -1103,6 +1203,9 @@ impl AssetProcessor { .iter() .map(|i| i.full_hash), ); + let mut processed_meta = full_meta + .unwrap_or_else(|| Box::new(AssetMeta::<(), ()>::new(AssetAction::Decomposed))); + new_processed_info.full_hash = full_hash; *processed_meta.processed_info_mut() = Some(new_processed_info.clone()); let meta_bytes = processed_meta.serialize(); @@ -1319,7 +1422,7 @@ impl ProcessingState { ) -> Result, AssetReaderError> { let infos = self.asset_infos.read().await; let info = infos - .get(path) + .get_recursive(path) .ok_or_else(|| AssetReaderError::NotFound(path.path().to_owned()))?; Ok(info.file_transaction_lock.read_arc().await) } @@ -1329,7 +1432,7 @@ impl ProcessingState { self.wait_until_initialized().await; let mut receiver = { let infos = self.asset_infos.write().await; - let info = infos.get(&path); + let info = infos.get_recursive(&path); match info { Some(info) => match info.status { Some(result) => return result, @@ -1385,16 +1488,13 @@ struct InstrumentedAssetProcessor(T); #[cfg(feature = "trace")] impl Process for InstrumentedAssetProcessor { type Settings = T::Settings; - type OutputLoader = T::OutputLoader; fn process( &self, context: &mut ProcessContext, meta: AssetMeta<(), Self>, - writer: &mut crate::io::Writer, - ) -> impl ConditionalSendFuture< - Output = Result<::Settings, ProcessError>, - > { + writer_context: WriterContext<'_>, + ) -> impl ConditionalSendFuture> { // Change the processor type for the `AssetMeta`, which works because we share the `Settings` type. let meta = AssetMeta { meta_format_version: meta.meta_format_version, @@ -1406,7 +1506,9 @@ impl Process for InstrumentedAssetProcessor { processor = core::any::type_name::(), asset = context.path().to_string(), ); - self.0.process(context, meta, writer).instrument(span) + self.0 + .process(context, meta, writer_context) + .instrument(span) } } @@ -1506,6 +1608,33 @@ impl ProcessorAssetInfos { self.infos.get(asset_path) } + /// Gets the [`ProcessorAssetInfo`] associated with `asset_path`, but also looks for directories + /// above that are considered processed assets. + pub(crate) fn get_recursive( + &self, + asset_path: &AssetPath<'static>, + ) -> Option<&ProcessorAssetInfo> { + if let Some(info) = self.infos.get(asset_path) { + // Avoid cloning if the path we get has info. + return Some(info); + } + + // Either the path isn't present at all, or the path is actually a subdirectory of a "multi" + // processed asset. So keep exploring up until we're sure there isn't a directory being + // processed. + let mut path_current = asset_path.clone_owned(); + + // PERF: This traverse up is expensive due to needing many allocations. We could use a more + // appropriate data structure like a trie instead. + while let Some(path_parent) = path_current.parent() { + path_current = path_parent; + if let Some(info) = self.infos.get(&path_current) { + return Some(info); + } + } + None + } + fn get_mut(&mut self, asset_path: &AssetPath<'static>) -> Option<&mut ProcessorAssetInfo> { self.infos.get_mut(asset_path) } diff --git a/crates/bevy_asset/src/processor/process.rs b/crates/bevy_asset/src/processor/process.rs index b37265d0fb660..458190bf36663 100644 --- a/crates/bevy_asset/src/processor/process.rs +++ b/crates/bevy_asset/src/processor/process.rs @@ -1,6 +1,6 @@ use crate::{ io::{ - AssetReaderError, AssetWriterError, MissingAssetWriterError, + AssetReaderError, AssetWriterError, ErasedAssetWriter, MissingAssetWriterError, MissingProcessedAssetReaderError, MissingProcessedAssetWriterError, SliceReader, Writer, }, meta::{AssetAction, AssetMeta, AssetMetaDyn, ProcessDependencyInfo, ProcessedInfo, Settings}, @@ -15,31 +15,37 @@ use alloc::{ boxed::Box, string::{String, ToString}, }; +use bevy_platform::collections::HashSet; use bevy_tasks::{BoxedFuture, ConditionalSendFuture}; -use core::marker::PhantomData; +use core::{ + marker::PhantomData, + ops::{Deref, DerefMut}, + sync::atomic::{AtomicU32, Ordering}, +}; +use futures_lite::AsyncWriteExt; use serde::{Deserialize, Serialize}; +use std::{ + path::{Path, PathBuf}, + sync::{Mutex, PoisonError}, +}; use thiserror::Error; -/// Asset "processor" logic that reads input asset bytes (stored on [`ProcessContext`]), processes the value in some way, -/// and then writes the final processed bytes with [`Writer`]. The resulting bytes must be loadable with the given [`Process::OutputLoader`]. +/// Asset "processor" logic that reads input asset bytes (stored on [`ProcessContext`]), processes +/// the value in some way, and then writes the processed assets with [`WriterContext`]. /// -/// This is a "low level", maximally flexible interface. Most use cases are better served by the [`LoadTransformAndSave`] implementation -/// of [`Process`]. +/// This is a "low level", maximally flexible interface. Most use cases are better served by the +/// [`LoadTransformAndSave`] implementation of [`Process`]. pub trait Process: Send + Sync + Sized + 'static { /// The configuration / settings used to process the asset. This will be stored in the [`AssetMeta`] and is user-configurable per-asset. type Settings: Settings + Default + Serialize + for<'a> Deserialize<'a>; - /// The [`AssetLoader`] that will be used to load the final processed asset. - type OutputLoader: AssetLoader; - /// Processes the asset stored on `context` in some way using the settings stored on `meta`. The results are written to `writer`. The - /// final written processed asset is loadable using [`Process::OutputLoader`]. This load will use the returned [`AssetLoader::Settings`]. + /// Processes the asset stored on `context` in some way using the settings stored on `meta`. The + /// results are written to `writer_context`. fn process( &self, context: &mut ProcessContext, meta: AssetMeta<(), Self>, - writer: &mut Writer, - ) -> impl ConditionalSendFuture< - Output = Result<::Settings, ProcessError>, - >; + writer_context: WriterContext<'_>, + ) -> impl ConditionalSendFuture>; } /// A flexible [`Process`] implementation that loads the source [`Asset`] using the `L` [`AssetLoader`], then transforms @@ -158,6 +164,8 @@ pub enum ProcessError { AssetTransformError(Box), #[error("Assets without extensions are not supported.")] ExtensionRequired, + #[error(transparent)] + InvalidProcessOutput(#[from] InvalidProcessOutput), } impl Process for LoadTransformAndSave @@ -168,14 +176,13 @@ where { type Settings = LoadTransformAndSaveSettings; - type OutputLoader = Saver::OutputLoader; async fn process( &self, context: &mut ProcessContext<'_>, meta: AssetMeta<(), Self>, - writer: &mut Writer, - ) -> Result<::Settings, ProcessError> { + writer_context: WriterContext<'_>, + ) -> Result<(), ProcessError> { let AssetAction::Process { settings, .. } = meta.asset else { return Err(ProcessError::WrongMetaType); }; @@ -197,12 +204,16 @@ where let saved_asset = SavedAsset::::from_transformed(&post_transformed_asset); - let output_settings = self - .saver - .save(writer, saved_asset, &settings.saver_settings) + let saver = &self.saver; + let saver_settings = &settings.saver_settings; + let mut writer = writer_context.write_full().await?; + + let output_settings = saver + .save(&mut *writer, saved_asset, saver_settings) .await .map_err(|error| ProcessError::AssetSaveError(error.into()))?; - Ok(output_settings) + + writer.finish::(output_settings).await } } @@ -214,8 +225,8 @@ pub trait ErasedProcessor: Send + Sync { &'a self, context: &'a mut ProcessContext, meta: Box, - writer: &'a mut Writer, - ) -> BoxedFuture<'a, Result, ProcessError>>; + writer_context: WriterContext<'a>, + ) -> BoxedFuture<'a, Result<(), ProcessError>>; /// Deserialized `meta` as type-erased [`AssetMeta`], operating under the assumption that it matches the meta /// for the underlying [`Process`] impl. fn deserialize_meta(&self, meta: &[u8]) -> Result, DeserializeMetaError>; @@ -228,19 +239,13 @@ impl ErasedProcessor for P { &'a self, context: &'a mut ProcessContext, meta: Box, - writer: &'a mut Writer, - ) -> BoxedFuture<'a, Result, ProcessError>> { + writer_context: WriterContext<'a>, + ) -> BoxedFuture<'a, Result<(), ProcessError>> { Box::pin(async move { let meta = meta .downcast::>() .map_err(|_e| ProcessError::WrongMetaType)?; - let loader_settings =

::process(self, context, *meta, writer).await?; - let output_meta: Box = - Box::new(AssetMeta::::new(AssetAction::Load { - loader: core::any::type_name::().to_string(), - settings: loader_settings, - })); - Ok(output_meta) +

::process(self, context, *meta, writer_context).await }) } @@ -336,3 +341,282 @@ impl<'a> ProcessContext<'a> { self.asset_bytes } } + +/// The context for any writers that a [`Process`] may use. +pub struct WriterContext<'a> { + /// The underlying writer of all writes for the [`Process`]. + writer: &'a dyn ErasedAssetWriter, + /// The context for initializing a write. + // We use a Mutex to avoid requiring a mutable borrow for `write_partial`. See `write_partial` + // for more details. + init_context: Mutex>, + /// The number of writes that have been fully finished. + /// + /// Note we use an `AtomicU32` instead of a u32 so that writes (and therefore finish's) don't + /// need to be synchronous. We use a mutable borrow so that full-writes can just update the + /// value without atomics. + finished_writes: &'a mut AtomicU32, + /// The meta object to write when writing a single file. Must be set to [`Some`] when writing a + /// "full" file. + full_meta: &'a mut Option>, + /// The path of the asset being processed. + path: &'a AssetPath<'static>, +} + +/// The context for the initialization when writing a processed file. +struct WriteInitContext<'a> { + /// The number of writes that have been started. + started_writes: &'a mut u32, + /// The set of currently started [`WriterContext::write_partial`] instances. + /// + /// This protects us from starting writes for the same path multiple times. + started_paths: HashSet, +} + +impl<'a> WriterContext<'a> { + pub(crate) fn new( + writer: &'a dyn ErasedAssetWriter, + started_writes: &'a mut u32, + finished_writes: &'a mut AtomicU32, + full_meta: &'a mut Option>, + path: &'a AssetPath<'static>, + ) -> Self { + Self { + writer, + init_context: Mutex::new(WriteInitContext { + started_writes, + started_paths: HashSet::new(), + }), + finished_writes, + full_meta, + path, + } + } + + /// Start writing a single output file, which can be loaded with the `load_settings`. + /// + /// Returns an error if you have previously called [`Self::write_partial`]. + pub async fn write_full(self) -> Result, ProcessError> { + let started_writes = self + .init_context + .into_inner() + .unwrap_or_else(PoisonError::into_inner) + .started_writes; + if *started_writes != 0 { + return Err(ProcessError::InvalidProcessOutput( + InvalidProcessOutput::FullFileAfterPartialFile, + )); + } + *started_writes = 1; + + let writer = self.writer.write(self.path.path()).await.map_err(|err| { + ProcessError::AssetWriterError { + path: self.path.clone_owned(), + err, + } + })?; + Ok(FullWriter { + writer, + finished_writes: self.finished_writes.get_mut(), + path: self.path, + meta: self.full_meta, + }) + } + + /// Start writing one of multiple output files, which can be loaded with the `load_settings`. + // Note: It would be nice to take this by a mutable reference instead. However, doing so would + // mean that the returned value would be tied to a "mutable reference lifetime", meaning we + // could not use more than one `PartialWriter` instance concurrently. + pub async fn write_partial(&self, file: &Path) -> Result, ProcessError> { + // Do all the validation in a scope so we don't hold the init_context for too long. + { + let mut init_context = self + .init_context + .lock() + .unwrap_or_else(PoisonError::into_inner); + // Check whether this path is valid first so that we don't mark the write as started + // when it hasn't. + if !init_context.started_paths.insert(file.to_path_buf()) { + return Err(InvalidProcessOutput::RepeatedPartialWriteToSamePath( + file.to_path_buf(), + ) + .into()); + } + *init_context.started_writes += 1; + } + + let path = self.path.path().join(file); + let path = AssetPath::from_path_buf(path).with_source(self.path.source().clone_owned()); + + let writer = self + .writer + .write(path.path()) + .await + // Note: It's possible that a user receives the error and then tries to recover, but + // this would leave the process in an invalid state (since you would never be able to + // call `finish` enough times). We could decrement the `started_writes` counter, but + // it's unclear what a reasonable recovery a user could do in this case - just + // propagating the error is safer and makes more sense. + .map_err(|err| ProcessError::AssetWriterError { + path: path.clone_owned(), + err, + })?; + Ok(PartialWriter { + meta_writer: self.writer, + writer, + finished_writes: &*self.finished_writes, + path, + }) + } +} + +/// An error regarding the output state of a [`Process`]. +#[derive(Error, Debug)] +pub enum InvalidProcessOutput { + /// The processor didn't start a write at all. + #[error( + "The processor never started writing a file (never called `write_full` or `write_partial`)" + )] + NoWriter, + /// The processor started a write but never finished it. + #[error("The processor started writing a file, but never called `finish`")] + UnfinishedWriter, + /// The processor started at least one partial write, then continued with a full write. + #[error("The processor called `write_full` after already calling `write_partial`")] + FullFileAfterPartialFile, + /// The processor started a partial write with the same path multiple times. + #[error("The processor called `write_partial` more than once with the same path")] + RepeatedPartialWriteToSamePath(PathBuf), +} + +/// The writer for a [`Process`] writing a single file (at the same path as the unprocessed asset). +pub struct FullWriter<'a> { + /// The writer to write to. + writer: Box, + /// The counter for finished writes that will be incremented when the write completes. + finished_writes: &'a mut u32, + /// The meta object that will be assigned on [`Self::finish`]. + meta: &'a mut Option>, + /// The path of the asset being written. + path: &'a AssetPath<'static>, +} + +impl FullWriter<'_> { + /// Finishes a write and indicates that the written asset should be loaded with the provided + /// loader and the provided settings for that loader. + /// + /// This must be called before the [`Process`] ends. + pub async fn finish( + mut self, + load_settings: L::Settings, + ) -> Result<(), ProcessError> { + self.writer + .flush() + .await + .map_err(|err| ProcessError::AssetWriterError { + path: self.path.clone_owned(), + err: AssetWriterError::Io(err), + })?; + + let output_meta = AssetMeta::::new(AssetAction::Load { + loader: core::any::type_name::().to_string(), + settings: load_settings, + }); + + // This should always be none, since we consumed the WriterContext, and we consume the + // only borrow here. + assert!(self.meta.is_none()); + *self.meta = Some(Box::new(output_meta)); + + // Make sure to increment finished writes at the very end, so that we only count it, once + // the future is finished anyway. + *self.finished_writes += 1; + Ok(()) + } +} + +/// A writer for a [`Process`] writing multiple partial files (as children of the unprocessed asset +/// path). +pub struct PartialWriter<'a> { + /// The writer to use when writing the meta file for this file. + meta_writer: &'a dyn ErasedAssetWriter, + /// The writer to write to. + writer: Box, + /// The counter for finished writes that will be incremented when the write completes. + finished_writes: &'a AtomicU32, + /// The path of the file being written. + /// + /// This includes the path relative to the unprocessed asset. + path: AssetPath<'static>, +} + +impl PartialWriter<'_> { + /// Finishes a write and indicates that the written asset should be loaded with the provided + /// loader and the provided settings for that loader. + /// + /// This must be called before the [`Process`] ends. + pub async fn finish( + mut self, + load_settings: L::Settings, + ) -> Result<(), ProcessError> { + self.writer + .flush() + .await + .map_err(|err| ProcessError::AssetWriterError { + path: self.path.clone_owned(), + err: AssetWriterError::Io(err), + })?; + + let output_meta = AssetMeta::::new(AssetAction::Load { + loader: core::any::type_name::().to_string(), + settings: load_settings, + }); + + let output_meta_bytes = AssetMetaDyn::serialize(&output_meta); + + let result = self + .meta_writer + .write_meta_bytes(self.path.path(), &output_meta_bytes) + .await + .map_err(|err| ProcessError::AssetWriterError { + path: self.path.clone_owned(), + err, + }); + + if result.is_ok() { + // The ordering here doesn't really matter, since this is just a cheaper Mutex. + // Just in case, we'll be overly safe and use SeqCst. + self.finished_writes.fetch_add(1, Ordering::SeqCst); + } + + result + } +} + +impl Deref for FullWriter<'_> { + type Target = Writer; + + fn deref(&self) -> &Self::Target { + self.writer.as_ref() + } +} + +impl DerefMut for FullWriter<'_> { + fn deref_mut(&mut self) -> &mut Self::Target { + self.writer.as_mut() + } +} + +impl Deref for PartialWriter<'_> { + type Target = Writer; + + fn deref(&self) -> &Self::Target { + self.writer.as_ref() + } +} + +impl DerefMut for PartialWriter<'_> { + fn deref_mut(&mut self) -> &mut Self::Target { + self.writer.as_mut() + } +} diff --git a/crates/bevy_asset/src/processor/tests.rs b/crates/bevy_asset/src/processor/tests.rs index 68941836e7d7c..fb468dbb769bb 100644 --- a/crates/bevy_asset/src/processor/tests.rs +++ b/crates/bevy_asset/src/processor/tests.rs @@ -1,6 +1,7 @@ use alloc::{ boxed::Box, collections::BTreeMap, + format, string::{String, ToString}, sync::Arc, vec, @@ -28,14 +29,17 @@ use crate::{ AssetReader, AssetReaderError, AssetSourceBuilder, AssetSourceEvent, AssetSourceId, AssetWatcher, PathStream, Reader, }, + meta::AssetMeta, processor::{ - AssetProcessor, LoadTransformAndSave, LogEntry, ProcessorState, ProcessorTransactionLog, - ProcessorTransactionLogFactory, + AssetProcessor, LoadTransformAndSave, LogEntry, Process, ProcessContext, ProcessError, + ProcessStatus, ProcessorState, ProcessorTransactionLog, ProcessorTransactionLogFactory, + WriterContext, }, saver::AssetSaver, tests::{run_app_until, CoolText, CoolTextLoader, CoolTextRon, SubText}, transformer::{AssetTransformer, TransformedAsset}, - Asset, AssetApp, AssetLoader, AssetMode, AssetPath, AssetPlugin, LoadContext, + Asset, AssetApp, AssetLoader, AssetMode, AssetPath, AssetPlugin, AssetServer, Assets, + LoadContext, }; #[derive(Clone)] @@ -516,6 +520,7 @@ fn asset_processor_transforms_asset_with_meta() { #[derive(Asset, TypePath, Serialize, Deserialize)] struct FakeGltf { gltf_nodes: BTreeMap, + gltf_meshes: Vec, } struct FakeGltfLoader; @@ -685,7 +690,8 @@ fn asset_processor_loading_can_read_processed_assets() { gltf_nodes: { "name": "thing", "position": "123", - } + }, + gltf_meshes: [], )"#, ); let bsn_path = Path::new("def.bsn"); @@ -843,7 +849,8 @@ fn asset_processor_loading_can_read_source_assets() { gltf_nodes: { "name": "thing", "position": "123", - } + }, + gltf_meshes: [], )"#, ); let gltf_path_2 = Path::new("def.gltf"); @@ -853,7 +860,8 @@ fn asset_processor_loading_can_read_source_assets() { gltf_nodes: { "velocity": "456", "color": "red", - } + }, + gltf_meshes: [], )"#, ); @@ -1326,6 +1334,8 @@ fn only_reprocesses_wrong_hash_on_startup() { let source_changed_asset = Path::new("source_changed.cool.ron"); let dep_unchanged_asset = Path::new("dep_unchanged.cool.ron"); let dep_changed_asset = Path::new("dep_changed.cool.ron"); + let multi_unchanged_asset = Path::new("multi_unchanged.gltf"); + let multi_changed_asset = Path::new("multi_changed.gltf"); let default_source_dir; let default_processed_dir; @@ -1344,22 +1354,43 @@ fn only_reprocesses_wrong_hash_on_startup() { } #[derive(TypePath, Clone)] - struct Count(Arc>, T); + struct Count

(Arc>, P); + + impl Process for Count

{ + type Settings = P::Settings; - impl> MutateAsset for Count { - fn mutate(&self, asset: &mut A) { + async fn process( + &self, + context: &mut ProcessContext<'_>, + meta: AssetMeta<(), Self>, + writer_context: WriterContext<'_>, + ) -> Result<(), ProcessError> { *self.0.lock().unwrap_or_else(PoisonError::into_inner) += 1; - self.1.mutate(asset); + self.1 + .process(context, AssetMeta::new(meta.asset), writer_context) + .await } } - let transformer = Count(Arc::new(Mutex::new(0)), MergeEmbeddedAndAddText); + let counter = Arc::new(Mutex::new(0)); type CoolTextProcessor = LoadTransformAndSave< CoolTextLoader, - RootAssetTransformer, CoolText>, + RootAssetTransformer, CoolTextSaver, >; + /// Assert that the `unsplit_path` gets split with `subpath` to contain a [`FakeGltf`] with just + /// one mesh. + fn assert_split_gltf(dir: &Dir, unsplit_path: &Path, subpath: &str, data: &str) { + assert_eq!( + read_asset_as_string(dir, &unsplit_path.join(subpath)), + serialize_gltf_to_string(&FakeGltf { + gltf_nodes: Default::default(), + gltf_meshes: vec![data.into()] + }) + ); + } + // Create a scope so that the app is completely gone afterwards (and we can see what happens // after reinitializing). { @@ -1374,12 +1405,19 @@ fn only_reprocesses_wrong_hash_on_startup() { app.init_asset::() .init_asset::() + .init_asset::() .register_asset_loader(CoolTextLoader) - .register_asset_processor(CoolTextProcessor::new( - RootAssetTransformer::new(transformer.clone()), - CoolTextSaver, + .register_asset_processor(Count( + counter.clone(), + CoolTextProcessor::new( + RootAssetTransformer::new(MergeEmbeddedAndAddText), + CoolTextSaver, + ), )) - .set_default_asset_processor::("cool.ron"); + .set_default_asset_processor::>("cool.ron") + .register_asset_loader(FakeGltfLoader) + .register_asset_processor(Count(counter.clone(), FakeGltfSplitProcessor)) + .set_default_asset_processor::>("gltf"); let guard = source_gate.write_blocking(); @@ -1407,6 +1445,21 @@ fn only_reprocesses_wrong_hash_on_startup() { &cool_text_with_embedded("dep_changed", source_changed_asset), ); + default_source_dir.insert_asset_text( + multi_unchanged_asset, + &serialize_gltf_to_string(&FakeGltf { + gltf_nodes: Default::default(), + gltf_meshes: vec!["a1".into(), "a2".into(), "a3".into()], + }), + ); + default_source_dir.insert_asset_text( + multi_changed_asset, + &serialize_gltf_to_string(&FakeGltf { + gltf_nodes: Default::default(), + gltf_meshes: vec!["b1".into(), "b2".into()], + }), + ); + run_app_until_finished_processing(&mut app, guard); assert_eq!( @@ -1425,12 +1478,44 @@ fn only_reprocesses_wrong_hash_on_startup() { read_asset_as_string(&default_processed_dir, dep_changed_asset), serialize_as_cool_text("dep_changed processed source_changed processed") ); + + assert_split_gltf( + &default_processed_dir, + multi_unchanged_asset, + "Mesh0.gltf", + "a1", + ); + assert_split_gltf( + &default_processed_dir, + multi_unchanged_asset, + "Mesh1.gltf", + "a2", + ); + assert_split_gltf( + &default_processed_dir, + multi_unchanged_asset, + "Mesh2.gltf", + "a3", + ); + + assert_split_gltf( + &default_processed_dir, + multi_changed_asset, + "Mesh0.gltf", + "b1", + ); + assert_split_gltf( + &default_processed_dir, + multi_changed_asset, + "Mesh1.gltf", + "b2", + ); } // Assert and reset the processing count. assert_eq!( - core::mem::take(&mut *transformer.0.lock().unwrap_or_else(PoisonError::into_inner)), - 4 + core::mem::take(&mut *counter.lock().unwrap_or_else(PoisonError::into_inner)), + 6 ); // Hand-make the app, since we need to pass in our already existing Dirs from the last app. @@ -1469,25 +1554,35 @@ fn only_reprocesses_wrong_hash_on_startup() { app.init_asset::() .init_asset::() .register_asset_loader(CoolTextLoader) - .register_asset_processor(CoolTextProcessor::new( - RootAssetTransformer::new(transformer.clone()), - CoolTextSaver, + .register_asset_processor(Count( + counter.clone(), + CoolTextProcessor::new( + RootAssetTransformer::new(MergeEmbeddedAndAddText), + CoolTextSaver, + ), )) - .set_default_asset_processor::("cool.ron"); + .set_default_asset_processor::>("cool.ron") + .register_asset_loader(FakeGltfLoader) + .register_asset_processor(Count(counter.clone(), FakeGltfSplitProcessor)) + .set_default_asset_processor::>("gltf"); let guard = source_gate.write_blocking(); default_source_dir .insert_asset_text(source_changed_asset, &serialize_as_cool_text("DIFFERENT")); + default_source_dir.insert_asset_text( + multi_changed_asset, + &serialize_gltf_to_string(&FakeGltf { + gltf_nodes: Default::default(), + gltf_meshes: vec!["c1".into()], + }), + ); run_app_until_finished_processing(&mut app, guard); - // Only source_changed and dep_changed assets were reprocessed - all others still have the same - // hashes. - assert_eq!( - *transformer.0.lock().unwrap_or_else(PoisonError::into_inner), - 2 - ); + // Only source_changed, dep_changed, and multi_changed assets were reprocessed - all others + // still have the same hashes. + assert_eq!(*counter.lock().unwrap_or_else(PoisonError::into_inner), 3); assert_eq!( read_asset_as_string(&default_processed_dir, no_deps_asset), @@ -1505,4 +1600,587 @@ fn only_reprocesses_wrong_hash_on_startup() { read_asset_as_string(&default_processed_dir, dep_changed_asset), serialize_as_cool_text("dep_changed processed DIFFERENT processed") ); + + assert_split_gltf( + &default_processed_dir, + multi_unchanged_asset, + "Mesh0.gltf", + "a1", + ); + assert_split_gltf( + &default_processed_dir, + multi_unchanged_asset, + "Mesh1.gltf", + "a2", + ); + assert_split_gltf( + &default_processed_dir, + multi_unchanged_asset, + "Mesh2.gltf", + "a3", + ); + + assert_split_gltf( + &default_processed_dir, + multi_changed_asset, + "Mesh0.gltf", + "c1", + ); + // The multi-processing should have deleted the previous files. + assert!(default_processed_dir + .get_asset(&multi_changed_asset.join("Mesh1.gltf")) + .is_none()); +} + +/// Serializes the provided `gltf` into a string (pretty-ly). +fn serialize_gltf_to_string(gltf: &FakeGltf) -> String { + ron::ser::to_string_pretty(gltf, PrettyConfig::new().new_line("\n")) + .expect("Conversion is safe") +} + +#[test] +fn gates_asset_path_on_process() { + let AppWithProcessor { + mut app, + default_source_dirs: + ProcessingDirs { + source: default_source_dir, + .. + }, + .. + } = create_app_with_asset_processor(&[]); + + /// Gates processing on acquiring the provided lock. + /// + /// This has different behavior from [`LockGatedReader`]: [`LockGatedReader`] blocks the + /// processor from even initializing, and asset loads block on initialization before asset. By + /// blocking during processing, we ensure that the loader is actually blocking on the processing + /// of the particular path. + struct GatedProcess

(Arc>, P); + + impl Process for GatedProcess

{ + type Settings = P::Settings; + + async fn process( + &self, + context: &mut ProcessContext<'_>, + meta: AssetMeta<(), Self>, + writer_context: WriterContext<'_>, + ) -> Result<(), ProcessError> { + let _guard = self.0.lock().await; + self.1 + .process(context, AssetMeta::new(meta.asset), writer_context) + .await + } + } + + type CoolTextProcessor = LoadTransformAndSave< + CoolTextLoader, + RootAssetTransformer, + CoolTextSaver, + >; + + let process_gate = Arc::new(async_lock::Mutex::new(())); + app.init_asset::() + .init_asset::() + .register_asset_loader(CoolTextLoader) + .register_asset_processor::>(GatedProcess( + process_gate.clone(), + CoolTextProcessor::new( + RootAssetTransformer::new(AddText(" processed".into())), + CoolTextSaver, + ), + )) + .set_default_asset_processor::>("cool.ron") + .init_asset::() + .register_asset_loader(FakeGltfLoader) + .register_asset_processor(GatedProcess(process_gate.clone(), FakeGltfSplitProcessor)) + .set_default_asset_processor::>("gltf"); + + // Lock the process gate so that we can't complete processing. + let guard = process_gate.lock_blocking(); + + default_source_dir.insert_asset_text(Path::new("abc.cool.ron"), &serialize_as_cool_text("abc")); + default_source_dir.insert_asset_text( + Path::new("def.gltf"), + &serialize_gltf_to_string(&FakeGltf { + gltf_nodes: Default::default(), + gltf_meshes: vec!["a".into(), "b".into()], + }), + ); + + let processor = app.world().resource::().clone(); + run_app_until(&mut app, |_| { + (bevy_tasks::block_on(processor.get_state()) == ProcessorState::Processing).then_some(()) + }); + + let handle = app + .world() + .resource::() + .load::("abc.cool.ron"); + let handle_multi_a = app + .world() + .resource::() + .load::("def.gltf/Mesh0.gltf"); + let handle_multi_b = app + .world() + .resource::() + .load::("def.gltf/Mesh1.gltf"); + // Update an arbitrary number of times. If at any point, the asset loads, we know we're not + // blocked on processing the asset! Note: If we're not blocking on the processed asset (this + // feature is broken), this test would be flaky on multi_threaded (though it should still + // deterministically fail on single-threaded). + for _ in 0..100 { + app.update(); + assert!(app + .world() + .resource::>() + .get(&handle) + .is_none()); + } + + // Now processing can finish! + drop(guard); + // Wait until the asset finishes loading, now that we're not blocked on the processor. + run_app_until(&mut app, |world| { + // Return None if any of these assets are still missing. + world.resource::>().get(&handle)?; + world.resource::>().get(&handle_multi_a)?; + world.resource::>().get(&handle_multi_b)?; + Some(()) + }); + + assert_eq!( + app.world() + .resource::>() + .get(&handle) + .unwrap() + .text, + "abc processed" + ); + let gltfs = app.world().resource::>(); + assert_eq!( + gltfs.get(&handle_multi_a).unwrap().gltf_meshes, + ["a".to_string()] + ); + assert_eq!( + gltfs.get(&handle_multi_b).unwrap().gltf_meshes, + ["b".to_string()] + ); +} + +/// A processor for [`FakeGltf`] that splits each mesh into its own [`FakeGltf`] file, and its nodes +/// into a [`FakeBsn`] file. +struct FakeGltfSplitProcessor; + +impl Process for FakeGltfSplitProcessor { + type Settings = (); + + async fn process( + &self, + context: &mut ProcessContext<'_>, + _meta: AssetMeta<(), Self>, + writer_context: WriterContext<'_>, + ) -> Result<(), ProcessError> { + use ron::ser::PrettyConfig; + + use crate::{ + io::AssetWriterError, + meta::{AssetAction, AssetMeta}, + }; + + let gltf = context + .load_source_asset::(AssetMeta::new(AssetAction::Load { + loader: "bevy_asset::tests::FakeGltfLoader".to_string(), + settings: (), + })) + .await?; + let Ok(gltf) = gltf.downcast::() else { + panic!("It should be impossible to downcast to the wrong type here") + }; + + let root_path = context.path().clone_owned(); + + let gltf = gltf.take(); + for (index, buffer) in gltf.gltf_meshes.into_iter().enumerate() { + let mut writer = writer_context + .write_partial(Path::new(&format!("Mesh{index}.gltf"))) + .await?; + let mesh_data = serialize_gltf_to_string(&FakeGltf { + gltf_meshes: vec![buffer], + gltf_nodes: Default::default(), + }); + writer + .write_all(mesh_data.as_bytes()) + .await + .map_err(|err| ProcessError::AssetWriterError { + path: root_path.clone_owned(), + err: AssetWriterError::Io(err), + })?; + writer.finish::(()).await?; + } + + let mut writer = writer_context + .write_partial(Path::new("Scene0.bsn")) + .await?; + let scene_data = ron::ser::to_string_pretty( + &FakeBsn { + parent_bsn: None, + nodes: gltf.gltf_nodes, + }, + PrettyConfig::new().new_line("\n"), + ) + .expect("Conversion is safe"); + writer + .write_all(scene_data.as_bytes()) + .await + .map_err(|err| ProcessError::AssetWriterError { + path: root_path.clone_owned(), + err: AssetWriterError::Io(err), + })?; + writer.finish::(()).await?; + Ok(()) + } +} + +#[test] +fn asset_processor_can_write_multiple_files() { + let AppWithProcessor { + mut app, + source_gate, + default_source_dirs: + ProcessingDirs { + source: source_dir, + processed: processed_dir, + .. + }, + .. + } = create_app_with_asset_processor(&[]); + + app.register_asset_loader(FakeGltfLoader) + .register_asset_loader(FakeBsnLoader) + .register_asset_processor(FakeGltfSplitProcessor) + .set_default_asset_processor::("gltf"); + + let guard = source_gate.write_blocking(); + + let gltf_path = Path::new("abc.gltf"); + source_dir.insert_asset_text( + gltf_path, + r#"( + gltf_nodes: { + "name": "thing", + "position": "123", + }, + gltf_meshes: ["buffer1", "buffer2", "buffer3"], +)"#, + ); + + run_app_until_finished_processing(&mut app, guard); + + let path_to_data = |path| { + let data = processed_dir.get_asset(Path::new(path)).unwrap(); + let data = str::from_utf8(data.value()).unwrap(); + data.to_string() + }; + + // All the meshes were decomposed into separate asset files. + assert_eq!( + path_to_data("abc.gltf/Mesh0.gltf"), + r#"( + gltf_nodes: {}, + gltf_meshes: [ + "buffer1", + ], +)"# + ); + assert_eq!( + path_to_data("abc.gltf/Mesh1.gltf"), + r#"( + gltf_nodes: {}, + gltf_meshes: [ + "buffer2", + ], +)"# + ); + assert_eq!( + path_to_data("abc.gltf/Mesh2.gltf"), + r#"( + gltf_nodes: {}, + gltf_meshes: [ + "buffer3", + ], +)"# + ); + + // The nodes should have been written to the scene file. + assert_eq!( + path_to_data("abc.gltf/Scene0.bsn"), + r#"( + parent_bsn: None, + nodes: { + "name": "thing", + "position": "123", + }, +)"# + ); +} + +#[test] +fn error_on_no_writer() { + let AppWithProcessor { + mut app, + source_gate, + default_source_dirs: ProcessingDirs { + source: source_dir, .. + }, + .. + } = create_app_with_asset_processor(&[]); + + struct NoWriterProcess; + + impl Process for NoWriterProcess { + type Settings = (); + + async fn process( + &self, + _: &mut ProcessContext<'_>, + _: AssetMeta<(), Self>, + _: WriterContext<'_>, + ) -> Result<(), ProcessError> { + // Don't start a writer! + Ok(()) + } + } + + app.register_asset_processor(NoWriterProcess) + .set_default_asset_processor::("txt"); + + let guard = source_gate.write_blocking(); + source_dir.insert_asset_text(Path::new("whatever.txt"), ""); + + run_app_until_finished_processing(&mut app, guard); + + let process_status = bevy_tasks::block_on( + app.world() + .resource::() + .data() + .wait_until_processed("whatever.txt".into()), + ); + // The process failed due to not having a writer. + assert_eq!(process_status, ProcessStatus::Failed); +} + +#[test] +fn error_on_unfinished_writer() { + let AppWithProcessor { + mut app, + source_gate, + default_source_dirs: ProcessingDirs { + source: source_dir, .. + }, + .. + } = create_app_with_asset_processor(&[]); + + struct UnfinishedWriterProcess; + + impl Process for UnfinishedWriterProcess { + type Settings = (); + + async fn process( + &self, + _: &mut ProcessContext<'_>, + _: AssetMeta<(), Self>, + writer_context: WriterContext<'_>, + ) -> Result<(), ProcessError> { + let _writer = writer_context.write_full().await?; + // Don't call finish on the writer! + Ok(()) + } + } + + app.register_asset_processor(UnfinishedWriterProcess) + .set_default_asset_processor::("txt"); + + let guard = source_gate.write_blocking(); + source_dir.insert_asset_text(Path::new("whatever.txt"), ""); + + run_app_until_finished_processing(&mut app, guard); + + let process_status = bevy_tasks::block_on( + app.world() + .resource::() + .data() + .wait_until_processed("whatever.txt".into()), + ); + // The process failed due to having a writer that we didn't await finish on. + assert_eq!(process_status, ProcessStatus::Failed); +} + +#[test] +fn error_on_full_writer_after_partial_writer() { + let AppWithProcessor { + mut app, + source_gate, + default_source_dirs: ProcessingDirs { + source: source_dir, .. + }, + .. + } = create_app_with_asset_processor(&[]); + + struct FullAfterPartialWriterProcess; + + impl Process for FullAfterPartialWriterProcess { + type Settings = (); + + async fn process( + &self, + _: &mut ProcessContext<'_>, + _: AssetMeta<(), Self>, + writer_context: WriterContext<'_>, + ) -> Result<(), ProcessError> { + // Properly write a "partial". + let writer = writer_context.write_partial(Path::new("multi.txt")).await?; + writer.finish::(()).await?; + + // Now trying writing "full", which conflicts! + let writer = writer_context.write_full().await?; + writer.finish::(()).await?; + + Ok(()) + } + } + + app.register_asset_processor(FullAfterPartialWriterProcess) + .set_default_asset_processor::("txt"); + + let guard = source_gate.write_blocking(); + source_dir.insert_asset_text(Path::new("whatever.txt"), ""); + + run_app_until_finished_processing(&mut app, guard); + + let process_status = bevy_tasks::block_on( + app.world() + .resource::() + .data() + .wait_until_processed("whatever.txt".into()), + ); + // The process failed due to having a full writer after a partial writer. + assert_eq!(process_status, ProcessStatus::Failed); +} + +#[test] +fn processor_can_parallelize_partial_writes() { + let AppWithProcessor { + mut app, + source_gate, + default_source_dirs: + ProcessingDirs { + source: source_dir, + processed: processed_dir, + .. + }, + .. + } = create_app_with_asset_processor(&[]); + + struct ParallelizedWriterProcess; + + impl Process for ParallelizedWriterProcess { + type Settings = (); + + async fn process( + &self, + _: &mut ProcessContext<'_>, + _: AssetMeta<(), Self>, + writer_context: WriterContext<'_>, + ) -> Result<(), ProcessError> { + let mut writer_1 = writer_context.write_partial(Path::new("a.txt")).await?; + let mut writer_2 = writer_context.write_partial(Path::new("b.txt")).await?; + + // Note: this call is blocking, so it's undesirable in production code using + // single-threaded mode (e.g., platforms like Wasm). For this test though, it's not a + // big deal. + bevy_tasks::IoTaskPool::get().scope(|scope| { + scope.spawn(async { + writer_1.write_all(b"abc123").await.unwrap(); + writer_1.finish::(()).await.unwrap(); + }); + scope.spawn(async { + writer_2.write_all(b"def456").await.unwrap(); + writer_2.finish::(()).await.unwrap(); + }); + }); + + Ok(()) + } + } + + app.register_asset_processor(ParallelizedWriterProcess) + .set_default_asset_processor::("txt"); + + let guard = source_gate.write_blocking(); + source_dir.insert_asset_text(Path::new("whatever.txt"), ""); + + run_app_until_finished_processing(&mut app, guard); + + assert_eq!( + &read_asset_as_string(&processed_dir, Path::new("whatever.txt/a.txt")), + "abc123" + ); + assert_eq!( + &read_asset_as_string(&processed_dir, Path::new("whatever.txt/b.txt")), + "def456" + ); +} + +#[test] +fn error_on_two_partial_writes_for_same_path() { + let AppWithProcessor { + mut app, + source_gate, + default_source_dirs: ProcessingDirs { + source: source_dir, .. + }, + .. + } = create_app_with_asset_processor(&[]); + + struct TwoPartialWritesForSamePathProcess; + + impl Process for TwoPartialWritesForSamePathProcess { + type Settings = (); + + async fn process( + &self, + _: &mut ProcessContext<'_>, + _: AssetMeta<(), Self>, + writer_context: WriterContext<'_>, + ) -> Result<(), ProcessError> { + // Properly write a "partial". + let writer = writer_context.write_partial(Path::new("multi.txt")).await?; + writer.finish::(()).await?; + + // Properly write to the same "partial". + let writer = writer_context.write_partial(Path::new("multi.txt")).await?; + writer.finish::(()).await?; + + Ok(()) + } + } + + app.register_asset_processor(TwoPartialWritesForSamePathProcess) + .set_default_asset_processor::("txt"); + + let guard = source_gate.write_blocking(); + source_dir.insert_asset_text(Path::new("whatever.txt"), ""); + + run_app_until_finished_processing(&mut app, guard); + + let process_status = bevy_tasks::block_on( + app.world() + .resource::() + .data() + .wait_until_processed("whatever.txt".into()), + ); + // The process failed due to writing "partial" to the same path twice. + assert_eq!(process_status, ProcessStatus::Failed); } diff --git a/crates/bevy_asset/src/server/mod.rs b/crates/bevy_asset/src/server/mod.rs index 9f7d594e5d3bb..a35041b680406 100644 --- a/crates/bevy_asset/src/server/mod.rs +++ b/crates/bevy_asset/src/server/mod.rs @@ -1460,6 +1460,11 @@ impl AssetServer { path: asset_path.clone_owned(), }) } + AssetActionMinimal::Decomposed => { + return Err(AssetLoadError::CannotLoadDecomposedAsset { + path: asset_path.clone_owned(), + }) + } AssetActionMinimal::Ignore => { return Err(AssetLoadError::CannotLoadIgnoredAsset { path: asset_path.clone_owned(), @@ -2049,6 +2054,9 @@ pub enum AssetLoadError { #[error("Asset '{path}' is configured to be processed. It cannot be loaded directly.")] #[from(ignore)] CannotLoadProcessedAsset { path: AssetPath<'static> }, + #[error("Asset '{path}' is the root of an asset that has been decomposed through processing. It cannot be loaded directly.")] + #[from(ignore)] + CannotLoadDecomposedAsset { path: AssetPath<'static> }, #[error("Asset '{path}' is configured to be ignored. It cannot be loaded.")] #[from(ignore)] CannotLoadIgnoredAsset { path: AssetPath<'static> }, diff --git a/examples/asset/processing/asset_processing.rs b/examples/asset/processing/asset_processing.rs index d5da644c27190..d7dfd95857527 100644 --- a/examples/asset/processing/asset_processing.rs +++ b/examples/asset/processing/asset_processing.rs @@ -4,7 +4,8 @@ use bevy::{ asset::{ embedded_asset, io::{Reader, Writer}, - processor::LoadTransformAndSave, + meta::AssetMeta, + processor::{LoadTransformAndSave, Process, ProcessContext, ProcessError, WriterContext}, saver::{AssetSaver, SavedAsset}, transformer::{AssetTransformer, TransformedAsset}, AssetLoader, AsyncWriteExt, LoadContext, @@ -13,7 +14,7 @@ use bevy::{ reflect::TypePath, }; use serde::{Deserialize, Serialize}; -use std::convert::Infallible; +use std::{convert::Infallible, io::BufRead, path::Path}; use thiserror::Error; fn main() { @@ -62,7 +63,9 @@ impl Plugin for TextPlugin { .register_asset_processor::>( LoadTransformAndSave::new(CoolTextTransformer, CoolTextSaver), ) - .set_default_asset_processor::>("cool.ron"); + .set_default_asset_processor::>("cool.ron") + .register_asset_processor(LineSplitterProcess) + .set_default_asset_processor::("lines"); } } @@ -225,6 +228,37 @@ impl AssetSaver for CoolTextSaver { } } +struct LineSplitterProcess; + +impl Process for LineSplitterProcess { + type Settings = (); + + async fn process( + &self, + context: &mut ProcessContext<'_>, + _meta: AssetMeta<(), Self>, + writer_context: WriterContext<'_>, + ) -> Result<(), ProcessError> { + let bytes = context.asset_bytes(); + if bytes.is_empty() { + return Err(ProcessError::AssetTransformError("empty asset".into())); + } + for (i, line) in bytes.lines().map(Result::unwrap).enumerate() { + let mut writer = writer_context + .write_partial(Path::new(&format!("Line{i}.line"))) + .await?; + writer.write_all(line.as_bytes()).await.map_err(|err| { + ProcessError::AssetWriterError { + path: context.path().clone_owned(), + err: err.into(), + } + })?; + writer.finish::(TextSettings::default()).await?; + } + Ok(()) + } +} + #[derive(Resource)] struct TextAssets { a: Handle, @@ -232,6 +266,10 @@ struct TextAssets { c: Handle, d: Handle, e: Handle, + multi_0: Handle, + multi_1: Handle, + multi_2: Handle, + multi_3: Handle, } fn setup(mut commands: Commands, assets: Res) { @@ -243,6 +281,10 @@ fn setup(mut commands: Commands, assets: Res) { c: assets.load("foo/c.cool.ron"), d: assets.load("d.cool.ron"), e: assets.load("embedded://asset_processing/e.txt"), + multi_0: assets.load("multi.lines/Line0.line"), + multi_1: assets.load("multi.lines/Line1.line"), + multi_2: assets.load("multi.lines/Line2.line"), + multi_3: assets.load("multi.lines/Line3.line"), }); } @@ -260,6 +302,10 @@ fn print_text( println!(" c: {:?}", texts.get(&handles.c)); println!(" d: {:?}", texts.get(&handles.d)); println!(" e: {:?}", texts.get(&handles.e)); + println!(" multi_0: {:?}", texts.get(&handles.multi_0)); + println!(" multi_1: {:?}", texts.get(&handles.multi_1)); + println!(" multi_2: {:?}", texts.get(&handles.multi_2)); + println!(" multi_3: {:?}", texts.get(&handles.multi_3)); println!("(You can modify source assets and their .meta files to hot-reload changes!)"); println!(); asset_events.clear(); diff --git a/examples/asset/processing/assets/multi.lines b/examples/asset/processing/assets/multi.lines new file mode 100644 index 0000000000000..d769af3581c15 --- /dev/null +++ b/examples/asset/processing/assets/multi.lines @@ -0,0 +1,4 @@ +A line of text, +this one's next! +A single process, +produces excess. \ No newline at end of file diff --git a/release-content/migration-guides/process_trait_changes.md b/release-content/migration-guides/process_trait_changes.md new file mode 100644 index 0000000000000..d8361b8e52c68 --- /dev/null +++ b/release-content/migration-guides/process_trait_changes.md @@ -0,0 +1,55 @@ +--- +title: The `Process` trait no longer has a single output. +pull_requests: [] +--- + +In previous versions, the `Process` trait had an associated type for the output loader, and took a +`writer` argument. This is no longer the case (in order to support one-to-many asset processing). +This change requires that users indicate whether they are using the "full" writer mode or the +"partial" writer mode. + +If your previous trait implementation of `Process` looked like this: + +```rust +impl Process for MyThing { + type Settings = MyThingSettings; + type OutputLoader = SomeLoader; + + async fn process( + &self, + context: &mut ProcessContext<'_>, + meta: AssetMeta<(), Self>, + writer: &mut Writer, + ) -> Result { + // Write to `writer`, then return the meta file you want. + let meta = todo!(); + Ok(meta) + } +} +``` + +Now it needs to look like: + +```rust +impl Process for MyThing { + type Settings = MyThingSettings; + + async fn process( + &self, + context: &mut ProcessContext<'_>, + meta: AssetMeta<(), Self>, + writer_context: WriteContext<'_>, + ) -> Result<(), ProcessError> { + let writer = writer_context.write_full().await?; + // Write to `writer`, then return the meta file you want. + let meta = todo!(); + writer.finish(meta).await + } +} +``` + +In addition, the returned `writer` is a wrapper around `&mut Writer`. This means you may need to +explicitly dereference in order to pass this writer into functions that take a `&mut Writer`. + +This does not apply if you are using the `LoadTransformAndSave` process - existing uses should +continue to work. diff --git a/release-content/release-notes/one_to_many_processing.md b/release-content/release-notes/one_to_many_processing.md new file mode 100644 index 0000000000000..290d63b686762 --- /dev/null +++ b/release-content/release-notes/one_to_many_processing.md @@ -0,0 +1,56 @@ +--- +title: One-to-many Asset Processing +authors: ["@andriyDev"] +pull_requests: [] +--- + +In previous versions, asset processing was always one-to-one: a processor would be given a single +asset to process and write to a single file. + +Now, an asset processor can write to multiple files! When implementing the `Process` trait, you can +call `writer_context.write_partial` and provide the path relative to the original asset. So for +example, here we have a processor that reads all the lines in a file and writes them each to their +own file: + +```rust +struct LineSplitterProcess; + +impl Process for LineSplitterProcess { + type Settings = (); + + async fn process( + &self, + context: &mut ProcessContext<'_>, + meta: AssetMeta<(), Self>, + writer_context: WriterContext<'_>, + ) -> Result<(), ProcessError> { + let bytes = context.asset_bytes(); + if bytes.is_empty() { + return Err(ProcessError::AssetTransformError("empty asset".into())); + } + for (i, line) in bytes.lines().map(Result::unwrap).enumerate() { + let mut writer = writer_context + .write_partial(Path::new(&format!("Line{i}.line"))) + .await?; + writer.write_all(line.as_bytes()).await.map_err(|err| { + ProcessError::AssetWriterError { + path: context.path().clone_owned(), + err: err.into(), + } + })?; + writer.finish::(TextSettings::default()).await?; + } + Ok(()) + } +} +``` + +Then if you have an asset like `shakespeare.txt`, you can load these separate files as +`shakespeare.txt/Line0.line`, `shakespeare.txt/Line1.line`, etc. These separate files can have +different file extensions, be loaded as completely separate asset types, or be entirely produced +from scratch within the asset processor! These files are treated as completely distinct assets, so +loading them looks like a regular asset load (e.g., +`asset_server.load("shakespeare.txt/Line1.line")`). + +We plan to use this to break apart large glTF files into smaller, easier-to-load pieces - +particularly for producing virtual geometry meshes.