Skip to content

Commit 197475c

Browse files
refactor
1 parent fdd6d0d commit 197475c

File tree

4 files changed

+84
-44
lines changed

4 files changed

+84
-44
lines changed
Lines changed: 44 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,37 @@
1-
import { produceEffects } from "produceEffects";
21
import { aggregate } from "../aggregate";
32
import type { Effect } from "../Effect";
43
import type { DomainEvent } from "../event-types";
4+
import { produceEffects } from "../produceEffects";
55
import type { Stack } from "../Stack";
6+
import { delay } from "../utils/delay";
67
import type { Publisher } from "../utils/publishers/Publisher";
78
import type { Scheduler } from "../utils/schedulers/Scheduler";
89
import type { Aggregator } from "./Aggregator";
910

1011
export class SyncAggregator implements Aggregator {
1112
private events: DomainEvent[];
13+
private latestStackSnapshot: Stack;
1214
private changePublisher: Publisher<{ effects: Effect[]; stack: Stack }>;
1315
private updateScheduler: Scheduler;
14-
private previousStack: Stack;
1516

16-
constructor(
17-
events: DomainEvent[],
18-
changePublisher: Publisher<{ effects: Effect[]; stack: Stack }>,
19-
updateScheduler: Scheduler,
20-
) {
21-
this.events = events;
22-
this.changePublisher = changePublisher;
23-
this.updateScheduler = updateScheduler;
24-
this.previousStack = this.computeStack();
17+
constructor(options: {
18+
initialEvents: DomainEvent[];
19+
changePublisher: Publisher<{ effects: Effect[]; stack: Stack }>;
20+
updateScheduler: Scheduler;
21+
}) {
22+
this.events = options.initialEvents;
23+
this.latestStackSnapshot = aggregate(this.events, Date.now());
24+
this.changePublisher = options.changePublisher;
25+
this.updateScheduler = options.updateScheduler;
2526
}
2627

2728
getStack(): Stack {
28-
return this.previousStack;
29+
return this.latestStackSnapshot;
2930
}
3031

3132
dispatchEvent(event: DomainEvent): void {
3233
this.events.push(event);
33-
this.updateStack();
34+
this.updateSnapshot();
3435
}
3536

3637
subscribeChanges(
@@ -41,15 +42,41 @@ export class SyncAggregator implements Aggregator {
4142
});
4243
}
4344

44-
private computeStack(): Stack {
45-
return aggregate(this.events, Date.now());
45+
private updateSnapshot(): void {
46+
const previousSnapshot = this.latestStackSnapshot;
47+
const currentSnapshot = aggregate(this.events, Date.now());
48+
const effects = produceEffects(previousSnapshot, currentSnapshot);
49+
50+
if (effects.length > 0) {
51+
this.latestStackSnapshot = currentSnapshot;
52+
this.changePublisher.publish({
53+
effects,
54+
stack: this.latestStackSnapshot,
55+
});
56+
}
57+
58+
const earliestUpcomingTransitionStateUpdate =
59+
this.calculateEarliestUpcomingTransitionStateUpdate();
60+
61+
if (earliestUpcomingTransitionStateUpdate) {
62+
this.updateScheduler.schedule(async (options) => {
63+
await delay(
64+
earliestUpcomingTransitionStateUpdate.timestamp - Date.now(),
65+
{ signal: options?.signal },
66+
);
67+
68+
if (options?.signal?.aborted) return;
69+
70+
this.updateSnapshot();
71+
});
72+
}
4673
}
4774

48-
private predictUpcomingTransitionStateUpdate(): {
75+
private calculateEarliestUpcomingTransitionStateUpdate(): {
4976
event: DomainEvent;
5077
timestamp: number;
5178
} | null {
52-
const activeActivities = this.previousStack.activities.filter(
79+
const activeActivities = this.latestStackSnapshot.activities.filter(
5380
(activity) =>
5481
activity.transitionState === "enter-active" ||
5582
activity.transitionState === "exit-active",
@@ -67,25 +94,4 @@ export class SyncAggregator implements Aggregator {
6794
}
6895
: null;
6996
}
70-
71-
private updateStack(): void {
72-
const previousStack = this.previousStack;
73-
const currentStack = this.computeStack();
74-
const effects = produceEffects(previousStack, currentStack);
75-
76-
if (effects.length > 0) {
77-
this.changePublisher.publish({ effects, stack: currentStack });
78-
79-
this.previousStack = currentStack;
80-
81-
const upcomingTransitionStateUpdate =
82-
this.predictUpcomingTransitionStateUpdate();
83-
84-
if (upcomingTransitionStateUpdate) {
85-
this.updateScheduler.schedule(async () => {
86-
this.updateStack();
87-
});
88-
}
89-
}
90-
}
9197
}

core/src/makeCoreStore.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,13 @@ export function makeCoreStore(options: MakeCoreStoreOptions): CoreStore {
7373
options.handlers?.onInitialActivityNotFound?.();
7474
}
7575

76-
const aggregator: Aggregator = new SyncAggregator(
77-
[...initialRemainingEvents, ...initialPushedEvents],
78-
new ScheduledPublisher(new SequentialScheduler(new Mutex())),
79-
new SwitchScheduler(new Mutex()),
80-
);
76+
const aggregator: Aggregator = new SyncAggregator({
77+
initialEvents: [...initialRemainingEvents, ...initialPushedEvents],
78+
changePublisher: new ScheduledPublisher(
79+
new SequentialScheduler(new Mutex()),
80+
),
81+
updateScheduler: new SwitchScheduler(new Mutex()),
82+
});
8183

8284
aggregator.subscribeChanges((effects) => {
8385
triggerPostEffectHooks(effects, pluginInstances, actions);

core/src/utils/Mutex.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,12 @@ export class Mutex {
1515
reject(getAbortReason(signal));
1616
};
1717
const lockWaiter = (lockHandle: LockHandle) => {
18-
resolve(lockHandle);
18+
if (signal?.aborted) {
19+
reject(getAbortReason(signal));
20+
lockHandle.release();
21+
} else {
22+
resolve(lockHandle);
23+
}
1924

2025
signal?.removeEventListener("abort", abortHandler);
2126
};

core/src/utils/delay.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import { getAbortReason } from "./getAbortReason";
2+
3+
export function delay(
4+
ms: number,
5+
options?: { signal?: AbortSignal },
6+
): Promise<void> {
7+
return new Promise((resolve, reject) => {
8+
const signal = options?.signal;
9+
10+
if (signal?.aborted) throw getAbortReason(signal);
11+
12+
const abortHandler = () => {
13+
if (!signal) return;
14+
15+
clearTimeout(timeoutId);
16+
reject(getAbortReason(signal));
17+
};
18+
const timeoutId = setTimeout(() => {
19+
if (signal?.aborted) abortHandler();
20+
else resolve();
21+
22+
signal?.removeEventListener("abort", abortHandler);
23+
}, ms);
24+
25+
signal?.addEventListener("abort", abortHandler, { once: true });
26+
});
27+
}

0 commit comments

Comments
 (0)