Skip to content

Commit ae65f24

Browse files
committed
Implemented Joined and Accessed for AnyBuffer and JsonBuffer
Signed-off-by: Michael X. Grey <[email protected]>
1 parent 792e0b6 commit ae65f24

File tree

6 files changed

+470
-95
lines changed

6 files changed

+470
-95
lines changed

src/buffer/any_buffer.rs

Lines changed: 199 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*
1616
*/
1717

18+
// TODO(@mxgrey): Add module-level documentation describing how to use AnyBuffer
19+
1820
use std::{
1921
any::{Any, TypeId},
2022
collections::HashMap,
@@ -29,14 +31,17 @@ use bevy_ecs::{
2931

3032
use thiserror::Error as ThisError;
3133

34+
use smallvec::SmallVec;
35+
3236
use crate::{
33-
Buffer, BufferAccessLifecycle, BufferAccessMut, BufferError, BufferKey, BufferStorage,
34-
DrainBuffer, NotifyBufferUpdate, GateState, Gate, OperationResult, OperationError,
35-
InspectBuffer, ManageBuffer,
37+
Buffer, Buffered, Bufferable, BufferAccessLifecycle, BufferAccessMut, BufferAccessors,
38+
BufferError, BufferKey, BufferStorage, Builder, DrainBuffer, NotifyBufferUpdate,
39+
GateState, Gate, OperationResult, OperationError, OperationRoster, OrBroken, InspectBuffer,
40+
ManageBuffer, Joined, Accessed, add_listener_to_source,
3641
};
3742

38-
/// A [`Buffer`] whose type has been anonymized. Joining with this buffer type
39-
/// will yield an [`AnyMessage`].
43+
/// A [`Buffer`] whose message type has been anonymized. Joining with this buffer
44+
/// type will yield an [`AnyMessage`].
4045
#[derive(Clone, Copy)]
4146
pub struct AnyBuffer {
4247
pub(crate) scope: Entity,
@@ -56,7 +61,7 @@ impl std::fmt::Debug for AnyBuffer {
5661

5762
impl AnyBuffer {
5863
/// Downcast this into a concrete [`Buffer`] type.
59-
pub fn into_buffer<T: 'static>(&self) -> Option<Buffer<T>> {
64+
pub fn downcast<T: 'static>(&self) -> Option<Buffer<T>> {
6065
if TypeId::of::<T>() == self.interface.message_type_id() {
6166
Some(Buffer {
6267
scope: self.scope,
@@ -87,29 +92,16 @@ impl<T: 'static + Send + Sync + Any> From<Buffer<T>> for AnyBuffer {
8792
/// of a buffer.
8893
#[derive(Clone)]
8994
pub struct AnyBufferKey {
90-
buffer: Entity,
91-
session: Entity,
92-
accessor: Entity,
93-
lifecycle: Option<Arc<BufferAccessLifecycle>>,
94-
interface: &'static (dyn AnyBufferAccessInterface + Send + Sync),
95-
}
96-
97-
impl std::fmt::Debug for AnyBufferKey {
98-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99-
f
100-
.debug_struct("AnyBufferKey")
101-
.field("buffer", &self.buffer)
102-
.field("session", &self.session)
103-
.field("accessor", &self.accessor)
104-
.field("in_use", &self.lifecycle.as_ref().is_some_and(|l| l.is_in_use()))
105-
.field("message_type_name", &self.interface.message_type_name())
106-
.finish()
107-
}
95+
pub(crate) buffer: Entity,
96+
pub(crate) session: Entity,
97+
pub(crate) accessor: Entity,
98+
pub(crate) lifecycle: Option<Arc<BufferAccessLifecycle>>,
99+
pub(crate) interface: &'static (dyn AnyBufferAccessInterface + Send + Sync),
108100
}
109101

110102
impl AnyBufferKey {
111103
/// Downcast this into a concrete [`BufferKey`] type.
112-
pub fn into_buffer_key<T: 'static>(&self) -> Option<BufferKey<T>> {
104+
pub fn downcast<T: 'static>(&self) -> Option<BufferKey<T>> {
113105
if TypeId::of::<T>() == self.interface.message_type_id() {
114106
Some(BufferKey {
115107
buffer: self.buffer,
@@ -122,6 +114,42 @@ impl AnyBufferKey {
122114
None
123115
}
124116
}
117+
118+
/// The buffer ID of this key.
119+
pub fn id(&self) -> Entity {
120+
self.buffer
121+
}
122+
123+
/// The session that this key belongs to.
124+
pub fn session(&self) -> Entity {
125+
self.session
126+
}
127+
128+
fn deep_clone(&self) -> Self {
129+
let mut deep = self.clone();
130+
deep.lifecycle = self
131+
.lifecycle
132+
.as_ref()
133+
.map(|l| Arc::new(l.as_ref().clone()));
134+
deep
135+
}
136+
137+
fn is_in_use(&self) -> bool {
138+
self.lifecycle.as_ref().is_some_and(|l| l.is_in_use())
139+
}
140+
}
141+
142+
impl std::fmt::Debug for AnyBufferKey {
143+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144+
f
145+
.debug_struct("AnyBufferKey")
146+
.field("buffer", &self.buffer)
147+
.field("session", &self.session)
148+
.field("accessor", &self.accessor)
149+
.field("in_use", &self.lifecycle.as_ref().is_some_and(|l| l.is_in_use()))
150+
.field("message_type_name", &self.interface.message_type_name())
151+
.finish()
152+
}
125153
}
126154

127155
impl<T: 'static + Send + Sync + Any> From<BufferKey<T>> for AnyBufferKey {
@@ -568,7 +596,7 @@ impl<T: 'static + Send + Sync + Any> AnyBufferAccessMutState for SystemState<Buf
568596
}
569597
}
570598

571-
trait AnyBufferAccessMut<'w, 's> {
599+
pub(crate) trait AnyBufferAccessMut<'w, 's> {
572600
fn as_any_buffer_mut<'a>(&'a mut self, key: &AnyBufferKey) -> Result<AnyBufferMut<'w, 's, 'a>, BufferError>;
573601
}
574602

@@ -605,16 +633,22 @@ pub(crate) trait AnyBufferAccessInterface {
605633
session: Entity,
606634
) -> OperationResult;
607635

636+
fn pull(
637+
&self,
638+
entity_mut: &mut EntityWorldMut,
639+
session: Entity,
640+
) -> Result<AnyMessage, OperationError>;
641+
608642
fn create_any_buffer_access_mut_state(
609643
&self,
610644
world: &mut World,
611645
) -> Box<dyn AnyBufferAccessMutState>;
612646
}
613647

614-
struct AnyBufferAccessImpl<T>(std::marker::PhantomData<T>);
648+
pub(crate) struct AnyBufferAccessImpl<T>(std::marker::PhantomData<T>);
615649

616650
impl<T: 'static + Send + Sync + Any> AnyBufferAccessImpl<T> {
617-
fn get_interface() -> &'static (dyn AnyBufferAccessInterface + Send + Sync) {
651+
pub(crate) fn get_interface() -> &'static (dyn AnyBufferAccessInterface + Send + Sync) {
618652
const INTERFACE_MAP: OnceLock<Mutex<HashMap<
619653
TypeId,
620654
&'static (dyn AnyBufferAccessInterface + Send + Sync)
@@ -655,6 +689,14 @@ impl<T: 'static + Send + Sync + Any> AnyBufferAccessInterface for AnyBufferAcces
655689
entity_mut.ensure_session::<T>(session)
656690
}
657691

692+
fn pull(
693+
&self,
694+
entity_mut: &mut EntityWorldMut,
695+
session: Entity,
696+
) -> Result<AnyMessage, OperationError> {
697+
entity_mut.pull_from_buffer::<T>(session).map(to_any_message)
698+
}
699+
658700
fn create_any_buffer_access_mut_state(
659701
&self,
660702
world: &mut World,
@@ -685,6 +727,86 @@ impl<T: 'static + Send + Sync + Any> DrainAnyBufferInterface for DrainBuffer<'_,
685727
}
686728
}
687729

730+
impl Bufferable for AnyBuffer {
731+
type BufferType = Self;
732+
fn into_buffer(self, builder: &mut Builder) -> Self::BufferType {
733+
assert_eq!(self.scope, builder.scope());
734+
self
735+
}
736+
}
737+
738+
impl Buffered for AnyBuffer {
739+
fn verify_scope(&self, scope: Entity) {
740+
assert_eq!(scope, self.scope);
741+
}
742+
743+
fn buffered_count(&self, session: Entity, world: &World) -> Result<usize, OperationError> {
744+
let entity_ref = world.get_entity(self.source).or_broken()?;
745+
self.interface.buffered_count(&entity_ref, session)
746+
}
747+
748+
fn add_listener(&self, listener: Entity, world: &mut World) -> OperationResult {
749+
add_listener_to_source(self.source, listener, world)
750+
}
751+
752+
fn gate_action(
753+
&self,
754+
session: Entity,
755+
action: Gate,
756+
world: &mut World,
757+
roster: &mut OperationRoster,
758+
) -> OperationResult {
759+
GateState::apply(self.source, session, action, world, roster)
760+
}
761+
762+
fn as_input(&self) -> SmallVec<[Entity; 8]> {
763+
SmallVec::from_iter([self.source])
764+
}
765+
766+
fn ensure_active_session(&self, session: Entity, world: &mut World) -> OperationResult {
767+
let mut entity_mut = world.get_entity_mut(self.source).or_broken()?;
768+
self.interface.ensure_session(&mut entity_mut, session)
769+
}
770+
}
771+
772+
impl Joined for AnyBuffer {
773+
type Item = AnyMessage;
774+
fn pull(&self, session: Entity, world: &mut World) -> Result<Self::Item, OperationError> {
775+
let mut buffer_mut = world.get_entity_mut(self.source).or_broken()?;
776+
self.interface.pull(&mut buffer_mut, session)
777+
}
778+
}
779+
780+
impl Accessed for AnyBuffer {
781+
type Key = AnyBufferKey;
782+
fn add_accessor(&self, accessor: Entity, world: &mut World) -> OperationResult {
783+
world
784+
.get_mut::<BufferAccessors>(self.source)
785+
.or_broken()?
786+
.add_accessor(accessor);
787+
Ok(())
788+
}
789+
790+
fn create_key(&self, builder: &super::BufferKeyBuilder) -> Self::Key {
791+
let components = builder.as_components(self.source);
792+
AnyBufferKey {
793+
buffer: components.buffer,
794+
session: components.session,
795+
accessor: components.accessor,
796+
lifecycle: components.lifecycle,
797+
interface: self.interface,
798+
}
799+
}
800+
801+
fn deep_clone_key(key: &Self::Key) -> Self::Key {
802+
key.deep_clone()
803+
}
804+
805+
fn is_key_in_use(key: &Self::Key) -> bool {
806+
key.is_in_use()
807+
}
808+
}
809+
688810
#[cfg(test)]
689811
mod tests {
690812
use bevy_ecs::prelude::World;
@@ -854,4 +976,53 @@ mod tests {
854976
.collect()
855977
}).unwrap()
856978
}
979+
980+
#[test]
981+
fn double_any_messages() {
982+
let mut context = TestingContext::minimal_plugins();
983+
984+
let workflow = context.spawn_io_workflow(|scope: Scope<(u32, i32, f32), (u32, i32, f32)>, builder| {
985+
let buffer_u32: AnyBuffer = builder.create_buffer::<u32>(BufferSettings::default()).into();
986+
let buffer_i32: AnyBuffer = builder.create_buffer::<i32>(BufferSettings::default()).into();
987+
let buffer_f32: AnyBuffer = builder.create_buffer::<f32>(BufferSettings::default()).into();
988+
989+
let (input_u32, input_i32, input_f32) = scope.input.chain(builder).unzip();
990+
input_u32
991+
.chain(builder)
992+
.map_block(|v| 2*v)
993+
.connect(buffer_u32.downcast::<u32>().unwrap().input_slot());
994+
995+
input_i32
996+
.chain(builder)
997+
.map_block(|v| 2*v)
998+
.connect(buffer_i32.downcast::<i32>().unwrap().input_slot());
999+
1000+
input_f32
1001+
.chain(builder)
1002+
.map_block(|v| 2.0*v)
1003+
.connect(buffer_f32.downcast::<f32>().unwrap().input_slot());
1004+
1005+
(buffer_u32, buffer_i32, buffer_f32)
1006+
.join(builder)
1007+
.map_block(|(value_u32, value_i32, value_f32)| {
1008+
(
1009+
*value_u32.downcast::<u32>().unwrap(),
1010+
*value_i32.downcast::<i32>().unwrap(),
1011+
*value_f32.downcast::<f32>().unwrap(),
1012+
)
1013+
})
1014+
.connect(scope.terminate);
1015+
});
1016+
1017+
let mut promise = context.command(
1018+
|commands| commands.request((1u32, 2i32, 3f32), workflow).take_response()
1019+
);
1020+
1021+
context.run_with_conditions(&mut promise, Duration::from_secs(2));
1022+
let (v_u32, v_i32, v_f32) = promise.take().available().unwrap();
1023+
assert_eq!(v_u32, 2);
1024+
assert_eq!(v_i32, 4);
1025+
assert_eq!(v_f32, 6.0);
1026+
assert!(context.no_unhandled_errors());
1027+
}
8571028
}

src/buffer/buffer_key_builder.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,13 @@ pub struct BufferKeyBuilder {
2828
lifecycle: Option<(ChannelSender, Arc<()>)>,
2929
}
3030

31+
pub struct BufferKeyComponents {
32+
pub buffer: Entity,
33+
pub session: Entity,
34+
pub accessor: Entity,
35+
pub lifecycle: Option<Arc<BufferAccessLifecycle>>,
36+
}
37+
3138
impl BufferKeyBuilder {
3239
pub(crate) fn build<T>(&self, buffer: Entity) -> BufferKey<T> {
3340
BufferKey {
@@ -48,6 +55,26 @@ impl BufferKeyBuilder {
4855
}
4956
}
5057

58+
// TODO(@mxgrey): Consider refactoring all the buffer key structs to use a
59+
// single inner struct like BufferKeyComponents
60+
pub(crate) fn as_components(&self, buffer: Entity) -> BufferKeyComponents {
61+
BufferKeyComponents {
62+
buffer,
63+
session: self.session,
64+
accessor: self.accessor,
65+
lifecycle: self.lifecycle.as_ref().map(|(sender, tracker)| {
66+
Arc::new(BufferAccessLifecycle::new(
67+
self.scope,
68+
buffer,
69+
self.session,
70+
self.accessor,
71+
sender.clone(),
72+
tracker.clone(),
73+
))
74+
}),
75+
}
76+
}
77+
5178
pub(crate) fn with_tracking(
5279
scope: Entity,
5380
session: Entity,

0 commit comments

Comments
 (0)