File tree Expand file tree Collapse file tree 1 file changed +15
-13
lines changed
core/shared/src/main/scala/fs2/concurrent Expand file tree Collapse file tree 1 file changed +15
-13
lines changed Original file line number Diff line number Diff line change @@ -171,19 +171,21 @@ object Topic {
171171 }
172172
173173 val action = sends.flatMap { allSucceeded =>
174- state.flatModify {
175- case s @ State .Active (subs, _, n, closing) =>
176- val dec = n - 1
177- if (dec == 0 && closing) {
178- val closeAction = foreach(subs)(_.close.void)
179- (State .Closed (), closeAction >> publishersFinished.complete(()).void)
180- } else {
181- (s.copy(publishing = dec), F .unit)
182- }
183- case s @ State .Closed () => (s, F .unit)
184- }.map { _ =>
185- if (allSucceeded) Topic .rightUnit else Topic .closed
186- }
174+ state
175+ .flatModify {
176+ case s @ State .Active (subs, _, n, closing) =>
177+ val dec = n - 1
178+ if (dec == 0 && closing) {
179+ val closeAction = foreach(subs)(_.close.void)
180+ (State .Closed (), closeAction >> publishersFinished.complete(()).void)
181+ } else {
182+ (s.copy(publishing = dec), F .unit)
183+ }
184+ case s @ State .Closed () => (s, F .unit)
185+ }
186+ .map { _ =>
187+ if (allSucceeded) Topic .rightUnit else Topic .closed
188+ }
187189 }
188190 (newState, action)
189191
You can’t perform that action at this time.
0 commit comments