diff --git a/src/command.rs b/src/command.rs index 16f1579..99a0808 100644 --- a/src/command.rs +++ b/src/command.rs @@ -3,17 +3,18 @@ use crate::event::Event; use crate::event_store::EventStreamId; use std::fmt::Debug; -pub trait Command: Clone { - type State: AggregateState; +pub trait Command: Clone { + type Event: Event; + type State: AggregateState; type Error: std::error::Error + Send + Sync + 'static; - fn handle(&self) -> Result, Self::Error>; + fn handle(&self) -> Result, Self::Error>; fn event_stream_id(&self) -> EventStreamId; fn get_state(&self) -> Self::State; - fn set_state(&self, state: Self::State) -> Self; + fn set_state(&mut self, state: Self::State); fn mark_retry(&self) -> Self where @@ -26,33 +27,18 @@ pub trait Command: Clone { None } - fn apply(&mut self, event: E) -> Self + fn apply(&mut self, event: &Self::Event) where Self: Sized, { - self.set_state(self.get_state().apply(event)) + self.set_state(self.get_state().apply(event)); } } -impl Command for () { - type State = (); - type Error = std::convert::Infallible; - - fn handle(&self) -> Result, Self::Error> { - Ok(vec![]) - } - fn event_stream_id(&self) -> EventStreamId { - EventStreamId::new() - } - fn get_state(&self) -> Self::State {} - fn set_state(&self, _: Self::State) -> Self {} - fn mark_retry(&self) -> Self {} -} - pub trait AggregateState: Debug + Sized { - fn apply(&self, event: E) -> Self; + fn apply(&self, event: &E) -> Self; } impl AggregateState for () { - fn apply(&self, _: E) {} + fn apply(&self, _: &E) {} } diff --git a/src/lib.rs b/src/lib.rs index 42a307c..8cb454e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,7 +20,7 @@ pub async fn execute( ) -> Result<(), Error> where E: Event, - C: Command, + C: Command, S: EventStore, { let mut retries = 0; @@ -45,7 +45,7 @@ where Ok(mut event_stream) => { while let Some((event, version)) = event_stream.next().await? { - command = command.apply(event); + command.apply(&event); expected_version = Some(version); } } @@ -168,14 +168,13 @@ mod tests { } } - impl Command for AlwaysConflictingCommand { + impl Command for AlwaysConflictingCommand { + type Event = TestEvent; type State = (); type Error = Error; fn get_state(&self) -> Self::State {} - fn set_state(&self, _: Self::State) -> Self { - (*self).clone() - } + fn set_state(&mut self, _: Self::State) {} fn event_stream_id(&self) -> EventStreamId { EventStreamId(self.id) } @@ -326,7 +325,8 @@ mod tests { state: StatefulCommandState, } - impl Command for ConcurrentModificationCommand { + impl Command for ConcurrentModificationCommand { + type Event = TestEvent; type State = StatefulCommandState; type Error = Error; @@ -334,10 +334,8 @@ mod tests { self.state.clone() } - fn set_state(&self, state: Self::State) -> Self { - let mut new = (*self).clone(); - new.state = state; - new + fn set_state(&mut self, state: Self::State) { + self.state = state; } fn event_stream_id(&self) -> EventStreamId { @@ -380,14 +378,14 @@ mod tests { } impl AggregateState for StatefulCommandState { - fn apply(&self, event: TestEvent) -> Self { + fn apply(&self, event: &TestEvent) -> Self { match event { TestEvent::FooHappened { value, .. } => Self { - foo: Some(value), + foo: Some(*value), ..*self }, TestEvent::BarHappened { value, .. } => Self { - bar: Some(value), + bar: Some(*value), ..*self }, _ => Self { ..*self }, @@ -463,7 +461,8 @@ mod tests { id: Uuid, } - impl Command for EventProducingCommand { + impl Command for EventProducingCommand { + type Event = TestEvent; type State = (); type Error = Infallible; @@ -477,10 +476,9 @@ mod tests { EventStreamId(self.id) } fn get_state(&self) -> Self::State {} - fn set_state(&self, _: Self::State) -> Self { - (*self).clone() - } + fn set_state(&mut self, _: Self::State) {} } + #[tokio::test] async fn read_error_returned_from_execute() { let mut event_store = create_invalid_test_store(); diff --git a/tests/test_cases.rs b/tests/test_cases.rs index 37fcdcf..93ee6f3 100644 --- a/tests/test_cases.rs +++ b/tests/test_cases.rs @@ -21,7 +21,8 @@ impl NoopCommand { } } -impl Command<()> for NoopCommand { +impl Command for NoopCommand { + type Event = (); type State = (); type Error = Infallible; @@ -32,9 +33,7 @@ impl Command<()> for NoopCommand { EventStreamId(self.id) } fn get_state(&self) -> Self::State {} - fn set_state(&self, _: Self::State) -> Self { - (*self).clone() - } + fn set_state(&mut self, _: Self::State) {} } #[derive(Debug, thiserror::Error)] @@ -52,7 +51,8 @@ impl RejectCommand { } } -impl Command<()> for RejectCommand { +impl Command for RejectCommand { + type Event = (); type State = (); type Error = RejectCommandError; @@ -63,9 +63,7 @@ impl Command<()> for RejectCommand { EventStreamId(self.id) } fn get_state(&self) -> Self::State {} - fn set_state(&self, _: Self::State) -> Self { - (*self).clone() - } + fn set_state(&mut self, _: Self::State) {} } #[derive(Clone)] @@ -79,7 +77,8 @@ impl EventProducingCommand { } } -impl Command for EventProducingCommand { +impl Command for EventProducingCommand { + type Event = TestEvent; type State = (); type Error = Infallible; @@ -93,9 +92,7 @@ impl Command for EventProducingCommand { EventStreamId(self.id) } fn get_state(&self) -> Self::State {} - fn set_state(&self, _: Self::State) -> Self { - (*self).clone() - } + fn set_state(&mut self, _: Self::State) {} } #[derive(Clone, Debug)] @@ -105,14 +102,14 @@ pub struct StatefulCommandState { } impl AggregateState for StatefulCommandState { - fn apply(&self, event: TestEvent) -> Self { + fn apply(&self, event: &TestEvent) -> Self { match event { TestEvent::FooHappened { value, .. } => Self { - foo: Some(value), + foo: Some(*value), ..*self }, TestEvent::BarHappened { value, .. } => Self { - bar: Some(value), + bar: Some(*value), ..*self }, _ => Self { ..*self }, @@ -138,7 +135,8 @@ impl StatefulCommand { } } -impl Command for StatefulCommand { +impl Command for StatefulCommand { + type Event = TestEvent; type State = StatefulCommandState; type Error = Infallible; @@ -146,10 +144,8 @@ impl Command for StatefulCommand { self.state.clone() } - fn set_state(&self, state: Self::State) -> Self { - let mut new = (*self).clone(); - new.state = state; - new + fn set_state(&mut self, state: Self::State) { + self.state = state; } fn event_stream_id(&self) -> EventStreamId {