Skip to content

Commit 7598448

Browse files
committed
fix: queue events in transition to avoid multiple racing calls to check and the onMatch/onMismatch functions. Fixes when multiple events trigger transitions at nearly the same time
1 parent c4786f9 commit 7598448

File tree

2 files changed

+91
-11
lines changed

2 files changed

+91
-11
lines changed

packages/automation/src/lib/transitions/transition.spec.ts

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
1-
import { TimerListener } from '../listeners';
1+
import { ConstantListener, TimerListener } from '../listeners';
22
import { Transition } from './transition';
33

4+
function flushPromises() {
5+
return new Promise(jest.requireActual('timers').setImmediate);
6+
}
7+
48
function coalesce(value: number | undefined) {
59
return value ?? 0;
610
}
@@ -36,6 +40,8 @@ describe('Transition', () => {
3640

3741
// After 4 seconds (listener1 counter = 4, listener2 counter = 2)
3842
jest.advanceTimersByTime(4000);
43+
await flushPromises();
44+
3945
await expect(check).toHaveBeenCalledTimes(6);
4046
await expect(onMismatch).toHaveBeenCalledTimes(5); // 4 for listener1, 2 for listener2. But last one matched
4147
await expect(onMatch).toHaveBeenCalledTimes(1);
@@ -47,6 +53,8 @@ describe('Transition', () => {
4753

4854
// After 3 seconds (listener1 counter = 3, listener2 counter = 1)
4955
jest.advanceTimersByTime(3000);
56+
await flushPromises();
57+
5058
await expect(check).toHaveBeenCalledTimes(4);
5159
await expect(onMismatch).toHaveBeenCalledTimes(4); // 3 for listener1, 1 for listener2
5260
await expect(onMismatch).toHaveBeenCalledWith([3, 1]); // Last of failing values
@@ -56,8 +64,10 @@ describe('Transition', () => {
5664
it('should stop calling callbacks after stopListening', async () => {
5765
await transition.startListening();
5866

59-
// After 2 seconds
67+
// After 3 seconds
6068
jest.advanceTimersByTime(3000);
69+
await flushPromises();
70+
6171
await expect(check).toHaveBeenCalledTimes(4);
6272
await expect(onMismatch).toHaveBeenCalledTimes(4); // 3 for listener1, 1 for listener2
6373
await expect(onMismatch).toHaveBeenCalledWith([3, 1]); // Example of checking values
@@ -91,6 +101,8 @@ describe('Transition', () => {
91101

92102
// After 2 seconds (listener1 counter = 2, listener2 counter = 1)
93103
jest.advanceTimersByTime(2000);
104+
await flushPromises();
105+
94106
await expect(onMatch).toHaveBeenCalledTimes(3); // Called for each state change
95107
await expect(onMatch).toHaveBeenCalledWith([2, 1]);
96108
});
@@ -107,6 +119,44 @@ describe('Transition', () => {
107119
await expect(onMatch).toHaveBeenCalledWith([]);
108120
});
109121

122+
it('should handle multiple simultaneous listener updates and call onMatch only once when it stops listeners', async () => {
123+
const listener1 = new ConstantListener(1000);
124+
const listener2 = new ConstantListener(2000);
125+
const transition = new Transition({
126+
listeners: [listener1, listener2],
127+
check,
128+
onMatch,
129+
onMismatch,
130+
});
131+
// Overload onMatch
132+
const stoppingOnMatch = jest.fn(() => {
133+
transition.stopListening();
134+
});
135+
// @ts-expect-error overwriting a readonly property
136+
transition['onMatch'] = stoppingOnMatch;
137+
138+
await transition.startListening();
139+
140+
// Simulate rapid listener updates
141+
listener1['emit'](1);
142+
listener1['emit'](2);
143+
listener1['emit'](3);
144+
listener2['emit'](1);
145+
listener2['emit'](2); // This call should match. No more calls to anything after this
146+
listener2['emit'](2); // Since this event, transition doesn't call check more values
147+
listener2['emit'](2);
148+
listener1['emit'](3);
149+
listener1['emit'](3);
150+
151+
jest.runAllTimers();
152+
await flushPromises();
153+
154+
await expect(check).toHaveBeenCalledTimes(5); // Check should only be called once for each queued values
155+
await expect(onMismatch).toHaveBeenCalledTimes(4); // onMismatch should be called always until a match is found, but not more
156+
await expect(stoppingOnMatch).toHaveBeenCalledTimes(1); // onMatch should only be called once
157+
await expect(stoppingOnMatch).toHaveBeenCalledWith([3, 2]);
158+
});
159+
110160
afterEach(async () => {
111161
await transition.stopListening();
112162
jest.useRealTimers();

packages/automation/src/lib/transitions/transition.ts

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { Listener } from '../listeners';
22

33
export type CheckFn = (values: (unknown | undefined)[]) => Promise<boolean>;
44
export type resultFn = (values: (unknown | undefined)[]) => Promise<void>;
5+
type Values = (unknown | undefined)[];
56

67
/**
78
* A Transition class that manages state transitions based on listeners and conditions.
@@ -17,10 +18,12 @@ export interface BaseTransitionParams {
1718
export class Transition {
1819
private readonly debug: boolean;
1920
private readonly listeners: Listener<unknown>[];
20-
private readonly values: (unknown | undefined)[];
21+
private readonly values: Values;
2122
private readonly check?: CheckFn;
2223
private readonly onMatch: resultFn;
2324
private readonly onMismatch?: resultFn;
25+
private readonly queue: Values[] = [];
26+
private isProcessingQueue = false;
2427

2528
/**
2629
* Creates a new Transition instance. If no listeners are provided, the transition will automatically match on the next event loop.
@@ -50,14 +53,12 @@ export class Transition {
5053
this.listeners.forEach((listener, index) => {
5154
listener.onStateChange(async (value: unknown) => {
5255
this.values[index] = value;
53-
const isMatch = this.check ? await this.check(this.values) : true;
54-
if (isMatch) {
55-
this.debug && console.log('match', this.values);
56-
await this.onMatch?.(this.values);
57-
} else {
58-
this.debug && console.log('mismatch', this.values);
59-
await this.onMismatch?.(this.values);
60-
}
56+
57+
// Enqueue the updated values
58+
this.queue.push([...this.values]);
59+
60+
// Process the queue
61+
await this.processQueue();
6162
});
6263
});
6364
}
@@ -83,6 +84,35 @@ export class Transition {
8384
*/
8485
async stopListening() {
8586
this.debug && console.log('stopListening');
87+
this.queue.length = 0; // Flush the queue as there might be more value arrays to check
8688
await Promise.all(this.listeners.map((listener) => listener.stop()));
8789
}
90+
91+
private async processQueue() {
92+
// Prevent concurrent queue processing
93+
if (this.isProcessingQueue) {
94+
return;
95+
}
96+
this.isProcessingQueue = true;
97+
98+
while (this.queue.length > 0) {
99+
const currentValues = this.queue.shift();
100+
101+
if (!currentValues) {
102+
continue;
103+
}
104+
105+
const isMatch = this.check ? await this.check(currentValues) : true;
106+
107+
if (isMatch) {
108+
this.debug && console.log('match', currentValues);
109+
await this.onMatch?.(currentValues);
110+
} else {
111+
this.debug && console.log('mismatch', currentValues);
112+
await this.onMismatch?.(currentValues);
113+
}
114+
}
115+
116+
this.isProcessingQueue = false; // Allow new queue processing
117+
}
88118
}

0 commit comments

Comments
 (0)