feat: restart poisoned streams instead of leaving corrupted state#262
feat: restart poisoned streams instead of leaving corrupted state#262
Conversation
| hydrated atomic.Bool | ||
| recordFailEvent func(failEvent) | ||
| subscribe func(*http.Request, ...eventsource.StreamOption) (*eventsource.Stream, error) | ||
| restartStreamFn func() |
There was a problem hiding this comment.
Don't see a way to test this behavior nicely without introducing this seam. Not a huge fan but I also don't think it's a disaster
| cancel context.CancelFunc | ||
|
|
||
| streamMu sync.Mutex | ||
| streamCancel context.CancelFunc |
There was a problem hiding this comment.
We now have stream level cancellation, which allows us to yeet the stream and start a new one without breaking the struct's guarantees
| } | ||
| } | ||
| case <-sc.ctx.Done(): | ||
| case <-streamCtx.Done(): |
There was a problem hiding this comment.
Run stream now binds to a scoped context. If that context dies the reader dies. Before this was struct level, meaning this could only be done once
gastonfournier
left a comment
There was a problem hiding this comment.
Just one relatively minor comment (it's an edge case)
| sc.streamMu.Lock() | ||
| if sc.streamCancel != nil { | ||
| sc.streamCancel() | ||
| } | ||
| sc.streamMu.Unlock() |
There was a problem hiding this comment.
Seems to be the common practice. It should be fine, but I'm thinking... what if streamCancel() fails... will it remain locked?
| sc.streamMu.Lock() | |
| if sc.streamCancel != nil { | |
| sc.streamCancel() | |
| } | |
| sc.streamMu.Unlock() | |
| sc.streamMu.Lock() | |
| defer sc.streamMu.Unlock() | |
| if sc.streamCancel != nil { | |
| sc.streamCancel() | |
| } |
This terminates a stream and restarts it in the case of poisoned events. In theory this should never happen, however, if for some reason the SDK receives a domain event it cannot process then continuing on is not safe - a rehydration from scratch is necessary to maintain data integrity