1+ import { head } from "lodash" ;
12import { SwitchScheduler } from "utils/schedulers/SwitchScheduler" ;
23import { aggregate } from "../aggregate" ;
34import type { Effect } from "../Effect" ;
@@ -8,32 +9,24 @@ import type { Stack } from "../Stack";
89import { delay } from "../utils/delay" ;
910import { getAbortReason } from "../utils/getAbortReason" ;
1011import type { Publisher } from "../utils/publishers/Publisher" ;
11- import type { Scheduler } from "../utils/schedulers/Scheduler" ;
1212import type { Aggregator } from "./Aggregator" ;
1313
1414export class SyncAggregator implements Aggregator {
1515 private changePublisher : Publisher < { effects : Effect [ ] ; stack : Stack } > ;
1616 private updateScheduler : SwitchScheduler ;
17+ private updateErrorReporter : ( error : unknown ) => void ;
1718 private events : DomainEvent [ ] ;
1819 private latestStackSnapshot : Stack | null ;
1920
20- private static UpdateOverridedError =
21- class UpdateOverridedError extends Error {
22- constructor ( ) {
23- super ( "a new update is scheduled" ) ;
24- }
25- } ;
26-
2721 constructor ( options : {
2822 changePublisher : Publisher < { effects : Effect [ ] ; stack : Stack } > ;
29- updateScheduler : Scheduler ;
23+ updateScheduler : SwitchScheduler ;
24+ updateErrorReporter : ( error : unknown ) => void ;
3025 initialEvents : DomainEvent [ ] ;
3126 } ) {
3227 this . changePublisher = options . changePublisher ;
33- this . updateScheduler = new SwitchScheduler ( {
34- SwitchException : SyncAggregator . UpdateOverridedError ,
35- scheduler : options . updateScheduler ,
36- } ) ;
28+ this . updateScheduler = options . updateScheduler ;
29+ this . updateErrorReporter = options . updateErrorReporter ;
3730 this . events = options . initialEvents ;
3831 this . latestStackSnapshot = null ;
3932 }
@@ -65,14 +58,19 @@ export class SyncAggregator implements Aggregator {
6558 }
6659
6760 private applyUpdate ( update : ( ) => void ) : void {
68- const previousSnapshot = this . readSnapshot ( ) ;
69-
70- update ( ) ;
71- this . latestStackSnapshot = aggregate ( this . events , Date . now ( ) ) ;
72- this . flushChanges (
73- produceEffects ( previousSnapshot , this . latestStackSnapshot ) ,
74- ) ;
75- this . scheduleTransitionStateUpdates ( ) ;
61+ try {
62+ const previousSnapshot = this . readSnapshot ( ) ;
63+ const projectionTime = Date . now ( ) ;
64+
65+ update ( ) ;
66+ this . latestStackSnapshot = aggregate ( this . events , projectionTime ) ;
67+ this . flushChanges (
68+ produceEffects ( previousSnapshot , this . latestStackSnapshot ) ,
69+ ) ;
70+ this . scheduleTransitionStateUpdates ( projectionTime ) ;
71+ } catch ( error ) {
72+ this . updateErrorReporter ( error ) ;
73+ }
7674 }
7775
7876 private flushChanges ( effects : Effect [ ] ) : void {
@@ -81,14 +79,16 @@ export class SyncAggregator implements Aggregator {
8179 this . changePublisher . publish ( { effects, stack : this . readSnapshot ( ) } ) ;
8280 }
8381
84- private scheduleTransitionStateUpdates ( ) : void {
82+ private scheduleTransitionStateUpdates ( projectionTime : number ) : void {
8583 const ongoingTransitions = projectToOngoingTransitions (
8684 this . events ,
87- Date . now ( ) ,
85+ projectionTime ,
86+ ) ;
87+ const nextToComplete = head (
88+ ongoingTransitions . sort (
89+ ( a , b ) => a . estimatedTransitionEnd - b . estimatedTransitionEnd ,
90+ ) ,
8891 ) ;
89- const nextToComplete = ongoingTransitions . sort (
90- ( a , b ) => a . estimatedTransitionEnd - b . estimatedTransitionEnd ,
91- ) [ 0 ] ;
9292
9393 if ( ! nextToComplete ) return ;
9494
@@ -104,12 +104,12 @@ export class SyncAggregator implements Aggregator {
104104 } )
105105 . catch ( ( error ) => {
106106 if (
107- error instanceof SyncAggregator . UpdateOverridedError ||
107+ error instanceof SwitchScheduler . SwitchException ||
108108 ( error instanceof DOMException && error . name === "AbortError" )
109109 )
110110 return ;
111111
112- throw error ;
112+ this . updateErrorReporter ( error ) ;
113113 } ) ;
114114 }
115115}
0 commit comments