Skip to content

Commit 6a06819

Browse files
author
anakin_karrot
committed
Test
1 parent 116de18 commit 6a06819

File tree

7 files changed

+152
-11
lines changed

7 files changed

+152
-11
lines changed
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
import { produceEffects } from "produceEffects";
2+
import { aggregate } from "../aggregate";
3+
import type { Effect } from "../Effect";
4+
import type {
5+
DomainEvent,
6+
PoppedEvent,
7+
PushedEvent,
8+
ReplacedEvent,
9+
} from "../event-types";
10+
import type { Stack } from "../Stack";
11+
import type { Publisher } from "../utils/Publisher/Publisher";
12+
import type { Aggregator } from "./Aggregator";
13+
14+
export class SyncAggregator implements Aggregator {
15+
private events: DomainEvent[];
16+
private changePublisher: Publisher<{ effects: Effect[]; stack: Stack }>;
17+
private autoUpdateTask: DynamicallyScheduledTask;
18+
private previousStack: Stack;
19+
20+
constructor(
21+
events: DomainEvent[],
22+
changePublisher: Publisher<{ effects: Effect[]; stack: Stack }>,
23+
) {
24+
this.events = events;
25+
this.changePublisher = changePublisher;
26+
this.autoUpdateTask = new DynamicallyScheduledTask(async () => {
27+
this.updateStack();
28+
});
29+
this.previousStack = this.computeStack();
30+
}
31+
32+
getStack(): Stack {
33+
return this.computeStack();
34+
}
35+
36+
dispatchEvent(event: DomainEvent): void {
37+
this.events.push(event);
38+
this.updateStack();
39+
}
40+
41+
subscribeChanges(
42+
listener: (effects: Effect[], stack: Stack) => void,
43+
): () => void {
44+
return this.changePublisher.subscribe(({ effects, stack }) => {
45+
listener(effects, stack);
46+
});
47+
}
48+
49+
private computeStack(): Stack {
50+
return aggregate(this.events, Date.now());
51+
}
52+
53+
private predictUpcomingTransitionStateUpdate(): {
54+
event: DomainEvent;
55+
timestamp: number;
56+
} | null {
57+
const stack = this.computeStack();
58+
const activeActivities = stack.activities.filter(
59+
(activity) =>
60+
activity.transitionState === "enter-active" ||
61+
activity.transitionState === "exit-active",
62+
);
63+
const mostRecentlyActivatedActivity = activeActivities.sort(
64+
(a, b) => b.estimatedTransitionEnd! - a.estimatedTransitionEnd!,
65+
)[0];
66+
67+
return mostRecentlyActivatedActivity
68+
? {
69+
event:
70+
mostRecentlyActivatedActivity.exitedBy ??
71+
mostRecentlyActivatedActivity.enteredBy,
72+
timestamp: mostRecentlyActivatedActivity.estimatedTransitionEnd!,
73+
}
74+
: null;
75+
}
76+
77+
private updateStack(): void {
78+
const previousStack = this.previousStack;
79+
const currentStack = this.computeStack();
80+
const effects = produceEffects(previousStack, currentStack);
81+
82+
if (effects.length > 0) {
83+
this.changePublisher.publish({ effects, stack: currentStack });
84+
85+
this.previousStack = currentStack;
86+
87+
const upcomingTransitionStateUpdate =
88+
this.predictUpcomingTransitionStateUpdate();
89+
90+
if (upcomingTransitionStateUpdate) {
91+
this.autoUpdateTask.schedule(upcomingTransitionStateUpdate.timestamp);
92+
}
93+
}
94+
}
95+
}
96+
97+
class DynamicallyScheduledTask {
98+
private task: () => Promise<void>;
99+
private scheduleId: number | null;
100+
101+
constructor(task: () => Promise<void>) {
102+
this.task = task;
103+
this.scheduleId = null;
104+
}
105+
106+
schedule(timestamp: number): void {
107+
if (this.scheduleId !== null) {
108+
clearTimeout(this.scheduleId);
109+
this.scheduleId = null;
110+
}
111+
112+
const timeoutId = setTimeout(
113+
() => {
114+
if (this.scheduleId !== timeoutId) return;
115+
if (Date.now() < timestamp) {
116+
this.schedule(timestamp);
117+
return;
118+
}
119+
120+
this.task();
121+
this.scheduleId = null;
122+
},
123+
Math.max(0, timestamp - Date.now()),
124+
);
125+
126+
this.scheduleId = timeoutId;
127+
}
128+
}

core/src/Stack.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ export type Activity = {
2929
id: string;
3030
name: string;
3131
transitionState: ActivityTransitionState;
32+
estimatedTransitionEnd?: number;
3233
params: {
3334
[key: string]: string | undefined;
3435
};

core/src/activity-utils/makeActivitiesReducer.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ export function makeActivitiesReducer({
2727
Pushed(activities: Activity[], event: PushedEvent): Activity[] {
2828
const isTransitionDone =
2929
now - (resumedAt ?? event.eventDate) >= transitionDuration;
30+
const estimatedTransitionEnd =
31+
(resumedAt ?? event.eventDate) + transitionDuration;
3032

3133
const transitionState: ActivityTransitionState =
3234
event.skipEnterActiveState || isTransitionDone
@@ -37,7 +39,7 @@ export function makeActivitiesReducer({
3739

3840
return [
3941
...activities.slice(0, reservedIndex),
40-
makeActivityFromEvent(event, transitionState),
42+
makeActivityFromEvent(event, transitionState, estimatedTransitionEnd),
4143
...activities.slice(reservedIndex + 1),
4244
];
4345
},
@@ -48,6 +50,8 @@ export function makeActivitiesReducer({
4850
Replaced(activities: Activity[], event: ReplacedEvent): Activity[] {
4951
const isTransitionDone =
5052
now - (resumedAt ?? event.eventDate) >= transitionDuration;
53+
const estimatedTransitionEnd =
54+
(resumedAt ?? event.eventDate) + transitionDuration;
5155

5256
const reservedIndex = findNewActivityIndex(activities, event);
5357

@@ -60,7 +64,7 @@ export function makeActivitiesReducer({
6064

6165
return [
6266
...activities.slice(0, reservedIndex),
63-
makeActivityFromEvent(event, transitionState),
67+
makeActivityFromEvent(event, transitionState, estimatedTransitionEnd),
6468
...activities.slice(reservedIndex + 1),
6569
];
6670
},

core/src/activity-utils/makeActivityFromEvent.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@ import type { Activity, ActivityTransitionState } from "../Stack";
44
export function makeActivityFromEvent(
55
event: PushedEvent | ReplacedEvent,
66
transitionState: ActivityTransitionState,
7+
estimatedTransitionEnd: number,
78
): Activity {
89
return {
910
id: event.activityId,
1011
name: event.activityName,
1112
transitionState,
13+
estimatedTransitionEnd,
1214
params: event.activityParams,
1315
context: event.activityContext,
1416
steps: [

core/src/activity-utils/makeActivityReducer.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ export function makeActivityReducer(context: {
4949
...activity,
5050
exitedBy: event,
5151
transitionState,
52+
estimatedTransitionEnd:
53+
(context.resumedAt ?? event.eventDate) + context.transitionDuration,
5254
params:
5355
transitionState === "exit-done"
5456
? activity.steps[0].params

core/src/makeCoreStore.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
1-
import isEqual from "react-fast-compare";
1+
import { ExclusiveTaskQueue } from "utils/TaskQueue/ExclusiveTaskQueue";
22
import type { Aggregator } from "./Aggregator/Aggregator";
3-
import { aggregate } from "./aggregate";
3+
import { SyncAggregator } from "./Aggregator/SyncAggregator";
44
import type { DomainEvent, PushedEvent, StepPushedEvent } from "./event-types";
55
import { makeEvent } from "./event-utils";
66
import type { StackflowActions, StackflowPlugin } from "./interfaces";
7-
import { produceEffects } from "./produceEffects";
8-
import type { Stack } from "./Stack";
97
import { divideBy, once } from "./utils";
108
import { makeActions } from "./utils/makeActions";
9+
import { QueuingPublisher } from "./utils/Publisher/QueuingPublisher";
1110
import { triggerPostEffectHooks } from "./utils/triggerPostEffectHooks";
1211

1312
const SECOND = 1000;
@@ -77,7 +76,10 @@ export function makeCoreStore(options: MakeCoreStoreOptions): CoreStore {
7776
options.handlers?.onInitialActivityNotFound?.();
7877
}
7978

80-
const aggregator: Aggregator = undefined as any;
79+
const aggregator: Aggregator = new SyncAggregator(
80+
[...initialRemainingEvents, ...initialPushedEvents],
81+
new QueuingPublisher(new ExclusiveTaskQueue()),
82+
);
8183

8284
aggregator.subscribeChanges((effects) => {
8385
triggerPostEffectHooks(effects, pluginInstances, actions);

core/src/produceEffects.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@ export function produceEffects(prevOutput: Stack, nextOutput: Stack): Effect[] {
99

1010
const somethingChanged = !isEqual(prevOutput, nextOutput);
1111

12-
if (somethingChanged) {
13-
output.push({
14-
_TAG: "%SOMETHING_CHANGED%",
15-
});
12+
if (!somethingChanged) {
13+
return output;
1614
}
1715

16+
output.push({
17+
_TAG: "%SOMETHING_CHANGED%",
18+
});
19+
1820
const isPaused =
1921
prevOutput.globalTransitionState !== "paused" &&
2022
nextOutput.globalTransitionState === "paused";

0 commit comments

Comments
 (0)