Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 9 additions & 23 deletions src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@ use crate::event::Event;
use crate::event_store::EventStreamId;
use std::fmt::Debug;

pub trait Command<E: Event>: Clone {
type State: AggregateState<E>;
pub trait Command: Clone {
type Event: Event;
type State: AggregateState<Self::Event>;
type Error: std::error::Error + Send + Sync + 'static;

fn handle(&self) -> Result<Vec<E>, Self::Error>;
fn handle(&self) -> Result<Vec<Self::Event>, Self::Error>;

fn event_stream_id(&self) -> EventStreamId;

fn get_state(&self) -> Self::State;

fn set_state(&self, state: Self::State) -> Self;
fn set_state(&mut self, state: Self::State);

fn mark_retry(&self) -> Self
where
Expand All @@ -26,33 +27,18 @@ pub trait Command<E: Event>: Clone {
None
}

fn apply(&mut self, event: E) -> Self
fn apply(&mut self, event: &Self::Event)
where
Self: Sized,
{
self.set_state(self.get_state().apply(event))
self.set_state(self.get_state().apply(event));
}
}

impl<E: Event> Command<E> for () {
type State = ();
type Error = std::convert::Infallible;

fn handle(&self) -> Result<Vec<E>, Self::Error> {
Ok(vec![])
}
fn event_stream_id(&self) -> EventStreamId {
EventStreamId::new()
}
fn get_state(&self) -> Self::State {}
fn set_state(&self, _: Self::State) -> Self {}
fn mark_retry(&self) -> Self {}
}

pub trait AggregateState<E: Event>: Debug + Sized {
fn apply(&self, event: E) -> Self;
fn apply(&self, event: &E) -> Self;
}

impl<E: Event> AggregateState<E> for () {
fn apply(&self, _: E) {}
fn apply(&self, _: &E) {}
}
34 changes: 16 additions & 18 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub async fn execute<E, C, S>(
) -> Result<(), Error>
where
E: Event,
C: Command<E>,
C: Command<Event = E>,
S: EventStore,
{
let mut retries = 0;
Expand All @@ -45,7 +45,7 @@ where

Ok(mut event_stream) => {
while let Some((event, version)) = event_stream.next().await? {
command = command.apply(event);
command.apply(&event);
expected_version = Some(version);
}
}
Expand Down Expand Up @@ -168,14 +168,13 @@ mod tests {
}
}

impl Command<TestEvent> for AlwaysConflictingCommand {
impl Command for AlwaysConflictingCommand {
type Event = TestEvent;
type State = ();
type Error = Error;

fn get_state(&self) -> Self::State {}
fn set_state(&self, _: Self::State) -> Self {
(*self).clone()
}
fn set_state(&mut self, _: Self::State) {}
fn event_stream_id(&self) -> EventStreamId {
EventStreamId(self.id)
}
Expand Down Expand Up @@ -326,18 +325,17 @@ mod tests {
state: StatefulCommandState,
}

impl Command<TestEvent> for ConcurrentModificationCommand {
impl Command for ConcurrentModificationCommand {
type Event = TestEvent;
type State = StatefulCommandState;
type Error = Error;

fn get_state(&self) -> Self::State {
self.state.clone()
}

fn set_state(&self, state: Self::State) -> Self {
let mut new = (*self).clone();
new.state = state;
new
fn set_state(&mut self, state: Self::State) {
self.state = state;
}

fn event_stream_id(&self) -> EventStreamId {
Expand Down Expand Up @@ -380,14 +378,14 @@ mod tests {
}

impl AggregateState<TestEvent> for StatefulCommandState {
fn apply(&self, event: TestEvent) -> Self {
fn apply(&self, event: &TestEvent) -> Self {
match event {
TestEvent::FooHappened { value, .. } => Self {
foo: Some(value),
foo: Some(*value),
..*self
},
TestEvent::BarHappened { value, .. } => Self {
bar: Some(value),
bar: Some(*value),
..*self
},
_ => Self { ..*self },
Expand Down Expand Up @@ -463,7 +461,8 @@ mod tests {
id: Uuid,
}

impl Command<TestEvent> for EventProducingCommand {
impl Command for EventProducingCommand {
type Event = TestEvent;
type State = ();
type Error = Infallible;

Expand All @@ -477,10 +476,9 @@ mod tests {
EventStreamId(self.id)
}
fn get_state(&self) -> Self::State {}
fn set_state(&self, _: Self::State) -> Self {
(*self).clone()
}
fn set_state(&mut self, _: Self::State) {}
}

#[tokio::test]
async fn read_error_returned_from_execute() {
let mut event_store = create_invalid_test_store();
Expand Down
36 changes: 16 additions & 20 deletions tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ impl NoopCommand {
}
}

impl Command<()> for NoopCommand {
impl Command for NoopCommand {
type Event = ();
type State = ();
type Error = Infallible;

Expand All @@ -32,9 +33,7 @@ impl Command<()> for NoopCommand {
EventStreamId(self.id)
}
fn get_state(&self) -> Self::State {}
fn set_state(&self, _: Self::State) -> Self {
(*self).clone()
}
fn set_state(&mut self, _: Self::State) {}
}

#[derive(Debug, thiserror::Error)]
Expand All @@ -52,7 +51,8 @@ impl RejectCommand {
}
}

impl Command<()> for RejectCommand {
impl Command for RejectCommand {
type Event = ();
type State = ();
type Error = RejectCommandError;

Expand All @@ -63,9 +63,7 @@ impl Command<()> for RejectCommand {
EventStreamId(self.id)
}
fn get_state(&self) -> Self::State {}
fn set_state(&self, _: Self::State) -> Self {
(*self).clone()
}
fn set_state(&mut self, _: Self::State) {}
}

#[derive(Clone)]
Expand All @@ -79,7 +77,8 @@ impl EventProducingCommand {
}
}

impl Command<TestEvent> for EventProducingCommand {
impl Command for EventProducingCommand {
type Event = TestEvent;
type State = ();
type Error = Infallible;

Expand All @@ -93,9 +92,7 @@ impl Command<TestEvent> for EventProducingCommand {
EventStreamId(self.id)
}
fn get_state(&self) -> Self::State {}
fn set_state(&self, _: Self::State) -> Self {
(*self).clone()
}
fn set_state(&mut self, _: Self::State) {}
}

#[derive(Clone, Debug)]
Expand All @@ -105,14 +102,14 @@ pub struct StatefulCommandState {
}

impl AggregateState<TestEvent> for StatefulCommandState {
fn apply(&self, event: TestEvent) -> Self {
fn apply(&self, event: &TestEvent) -> Self {
match event {
TestEvent::FooHappened { value, .. } => Self {
foo: Some(value),
foo: Some(*value),
..*self
},
TestEvent::BarHappened { value, .. } => Self {
bar: Some(value),
bar: Some(*value),
..*self
},
_ => Self { ..*self },
Expand All @@ -138,18 +135,17 @@ impl StatefulCommand {
}
}

impl Command<TestEvent> for StatefulCommand {
impl Command for StatefulCommand {
type Event = TestEvent;
type State = StatefulCommandState;
type Error = Infallible;

fn get_state(&self) -> Self::State {
self.state.clone()
}

fn set_state(&self, state: Self::State) -> Self {
let mut new = (*self).clone();
new.state = state;
new
fn set_state(&mut self, state: Self::State) {
self.state = state;
}

fn event_stream_id(&self) -> EventStreamId {
Expand Down