-
Notifications
You must be signed in to change notification settings - Fork 78
Implement replay_event_log_async
#1266
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
As we now have AsyncSessionPersister to enable async persistence, we need to be able to replay the event log asynchronously too.
Pull Request Test Coverage Report for Build 21076949429Details
💛 - Coveralls |
arminsabouri
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cACK be7d040
Just had one question. Otherwise code looks good.
|
|
||
| for event in logs { | ||
| session_events.push(event.clone()); | ||
| receiver = receiver.process_event(event)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the previous code if process_event failed we close the session. Is this a regression?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks the close was move to the replay_events call side. Is that right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replay_events does no IO so that it can be reused in the async version of replay_event_log. So it just returns an error here and that's handled with a session close in replay_event_log/_async:
let (receiver, session_events) = match replay_events(logs.map(|e| e.into())) {
Ok(r) => r,
Err(e) => {
persister.close().await.map_err(|ce| {
InternalReplayError::PersistenceFailure(ImplementationError::new(ce))
})?;
return Err(e);
}
};There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a second commit that adds test coverage for this scenario to confirm no regression
When `process_event` fails we close the session. This adds test coverage for that scenario.
As we now have
AsyncSessionPersisterto enable a sync persistence (#1235), we need to be able to replay the event log asynchronously too.There's still considerable duplication between the
replay_eventsfunctions on the send and receive sides, but I'm not convinced whether we should refactor further by introducing aSessionEventtrait that both the receive and senderSessionEventtypes could both implement (and the same on Receive/SendSession).Pull Request Checklist
Please confirm the following before requesting review:
AI
in the body of this PR.