Skip to content

Commit 171080c

Browse files
authored
Refactor to use middleware / subscriber internally (#1224)
1 parent bf86857 commit 171080c

File tree

14 files changed

+596
-154
lines changed

14 files changed

+596
-154
lines changed

.changeset/smart-pets-raise.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@segment/analytics-signals': patch
3+
---
4+
5+
Refactor to use SignalEmitter middleware + subscriber interface internally

packages/signals/signals-integration-tests/src/helpers/base-page-object.ts

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -265,10 +265,9 @@ export class BasePage {
265265
}
266266

267267
waitForEdgeFunctionResponse(timeout = 30000) {
268-
return this.page.waitForResponse(
269-
`https://cdn.edgefn.segment.com/MY-WRITEKEY/**`,
270-
{ timeout }
271-
)
268+
return this.page.waitForResponse(this.edgeFnDownloadURL, {
269+
timeout,
270+
})
272271
}
273272

274273
async waitForCDNSettingsResponse(timeout = 30000) {
@@ -277,6 +276,4 @@ export class BasePage {
277276
{ timeout }
278277
)
279278
}
280-
281-
// Additional mock methods can be added here
282279
}
Lines changed: 353 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,353 @@
1+
import { sleep } from '@segment/analytics-core'
2+
import { Signal } from '@segment/analytics-signals-runtime'
3+
import {
4+
SignalEmitter,
5+
SignalsMiddlewareContext,
6+
SignalsSubscriber,
7+
SignalsMiddleware,
8+
} from '../index'
9+
10+
const mockCtx = {
11+
unstableGlobalSettings: {},
12+
analyticsInstance: {},
13+
buffer: {},
14+
} as SignalsMiddlewareContext
15+
16+
describe(SignalEmitter, () => {
17+
let emitter: SignalEmitter
18+
let mockSubscriber: jest.Mocked<SignalsSubscriber>
19+
let mockSignal: Signal
20+
let mockMiddleware: jest.Mocked<SignalsMiddleware>
21+
22+
beforeEach(() => {
23+
emitter = new SignalEmitter()
24+
mockSubscriber = {
25+
load: jest.fn(),
26+
process: jest.fn(),
27+
}
28+
mockSignal = { type: 'test', data: {} } as any as Signal
29+
mockMiddleware = {
30+
load: jest.fn(),
31+
process: jest.fn().mockReturnValue(mockSignal),
32+
}
33+
})
34+
35+
it('should subscribe and unsubscribe a subscriber', async () => {
36+
emitter.subscribe(mockSubscriber)
37+
await emitter.start(mockCtx)
38+
39+
emitter.emit(mockSignal)
40+
await sleep(0)
41+
expect(mockSubscriber.process).toHaveBeenCalledTimes(1)
42+
expect(mockSubscriber.process).toHaveBeenCalledWith(mockSignal)
43+
await emitter.unsubscribe(mockSubscriber)
44+
45+
emitter.emit(mockSignal)
46+
47+
expect(mockSubscriber.process).toHaveBeenCalledTimes(1)
48+
})
49+
50+
it('should subscribe and unsubscribe if subscriber is a function', async () => {
51+
const mockSubscriber = jest.fn()
52+
emitter.subscribe(mockSubscriber)
53+
54+
emitter.emit(mockSignal)
55+
56+
expect(mockSubscriber).not.toHaveBeenCalledWith(mockSignal)
57+
58+
await emitter.start(mockCtx)
59+
60+
expect(mockSubscriber).toHaveBeenCalledTimes(1)
61+
expect(mockSubscriber).toHaveBeenCalledWith(mockSignal)
62+
await emitter.unsubscribe(mockSubscriber)
63+
64+
emitter.emit(mockSignal)
65+
66+
expect(mockSubscriber).toHaveBeenCalledTimes(1)
67+
})
68+
69+
it('should allow subscribing after start is called', async () => {
70+
await emitter.start(mockCtx)
71+
72+
const mockSubscriber = jest.fn()
73+
emitter.subscribe(mockSubscriber)
74+
75+
emitter.emit(mockSignal)
76+
expect(mockSubscriber).toHaveBeenCalledTimes(1)
77+
expect(mockSubscriber).toHaveBeenCalledWith(mockSignal)
78+
})
79+
80+
it('should handle multiple subscribers', async () => {
81+
const mockSubscriber2 = {
82+
load: jest.fn(),
83+
process: jest.fn(),
84+
}
85+
emitter.subscribe(mockSubscriber)
86+
emitter.subscribe(mockSubscriber2)
87+
88+
emitter.emit(mockSignal)
89+
90+
expect(mockSubscriber.process).not.toHaveBeenCalledWith(mockSignal)
91+
expect(mockSubscriber2.process).not.toHaveBeenCalledWith(mockSignal)
92+
93+
await emitter.start(mockCtx)
94+
95+
expect(mockSubscriber.process).toHaveBeenCalledTimes(1)
96+
expect(mockSubscriber.process).toHaveBeenCalledWith(mockSignal)
97+
expect(mockSubscriber2.process).toHaveBeenCalledTimes(1)
98+
expect(mockSubscriber2.process).toHaveBeenCalledWith(mockSignal)
99+
})
100+
101+
it('should handle multiple unsubscriptions', async () => {
102+
await emitter.start(mockCtx)
103+
104+
const anotherSubscriber: SignalsSubscriber = {
105+
load: jest.fn(),
106+
process: jest.fn(),
107+
}
108+
109+
emitter.subscribe(mockSubscriber, anotherSubscriber)
110+
emitter.unsubscribe(mockSubscriber, anotherSubscriber)
111+
112+
// Emit a signal to test if neither subscriber receives it
113+
emitter.emit(mockSignal)
114+
115+
expect(mockSubscriber.process).not.toHaveBeenCalled()
116+
expect(anotherSubscriber.process).not.toHaveBeenCalled()
117+
})
118+
119+
it('should buffer signals before initialization', async () => {
120+
emitter.emit(mockSignal)
121+
122+
expect(mockSubscriber.process).not.toHaveBeenCalled()
123+
124+
emitter.subscribe(mockSubscriber)
125+
await emitter.start(mockCtx)
126+
127+
expect(mockSubscriber.process).toHaveBeenCalledWith(mockSignal)
128+
expect(mockSubscriber.process).toHaveBeenCalledTimes(1)
129+
})
130+
131+
it('should process signals through middleware', async () => {
132+
emitter = await new SignalEmitter({ middleware: [mockMiddleware] })
133+
.subscribe(mockSubscriber)
134+
.start(mockCtx)
135+
136+
emitter.emit(mockSignal)
137+
138+
await sleep(0)
139+
140+
expect(mockMiddleware.process).toHaveBeenCalledWith(mockSignal)
141+
expect(mockSubscriber.process).toHaveBeenCalledWith(mockSignal)
142+
})
143+
144+
it('should run buffered signals through middleware', async () => {
145+
emitter = new SignalEmitter({ middleware: [mockMiddleware] }).subscribe(
146+
mockSubscriber
147+
)
148+
149+
emitter.emit(mockSignal)
150+
151+
await emitter.start(mockCtx)
152+
153+
await sleep(0)
154+
155+
expect(mockMiddleware.process).toHaveBeenCalledWith(mockSignal)
156+
expect(mockSubscriber.process).toHaveBeenCalledWith(mockSignal)
157+
})
158+
159+
it('should drop signals if middleware returns null', async () => {
160+
mockMiddleware.process.mockReturnValueOnce(null)
161+
162+
emitter = await new SignalEmitter({ middleware: [mockMiddleware] })
163+
.subscribe(mockSubscriber)
164+
.start(mockCtx)
165+
166+
// drop signal
167+
emitter.emit(mockSignal)
168+
169+
await sleep(0)
170+
171+
expect(mockMiddleware.process).toHaveBeenCalledWith(mockSignal)
172+
expect(mockSubscriber.process).not.toHaveBeenCalled()
173+
})
174+
175+
it('should not drop signals if middleware returns undefined', async () => {
176+
mockMiddleware.process.mockReturnValueOnce(undefined as any)
177+
178+
emitter = await new SignalEmitter({ middleware: [mockMiddleware] })
179+
.subscribe(mockSubscriber)
180+
.start(mockCtx)
181+
182+
emitter.emit(mockSignal)
183+
184+
await sleep(0)
185+
186+
expect(mockMiddleware.process).toHaveBeenCalledWith(mockSignal)
187+
expect(mockSubscriber.process).toHaveBeenCalledWith(mockSignal)
188+
})
189+
190+
it('should support .addMiddleware method', async () => {
191+
emitter = await new SignalEmitter()
192+
.addMiddleware(mockMiddleware)
193+
.subscribe(mockSubscriber)
194+
.start(mockCtx)
195+
196+
emitter.emit(mockSignal)
197+
198+
await sleep(0)
199+
200+
expect(mockMiddleware.process).toHaveBeenCalledWith(mockSignal)
201+
expect(mockSubscriber.process).toHaveBeenCalledWith(mockSignal)
202+
})
203+
204+
it('should support .removeMiddleware method', async () => {
205+
emitter = await new SignalEmitter()
206+
.addMiddleware(mockMiddleware)
207+
.removeMiddleware(mockMiddleware)
208+
.subscribe(mockSubscriber)
209+
.start(mockCtx)
210+
211+
emitter.emit(mockSignal)
212+
213+
await sleep(0)
214+
215+
expect(mockMiddleware.process).not.toHaveBeenCalled()
216+
expect(mockSubscriber.process).toHaveBeenCalled()
217+
})
218+
219+
it('should support add and removeMiddleware after start', async () => {
220+
emitter = await new SignalEmitter().subscribe(mockSubscriber).start(mockCtx)
221+
emitter.addMiddleware(mockMiddleware)
222+
emitter.emit(mockSignal)
223+
224+
await sleep(0)
225+
226+
expect(mockMiddleware.process).toHaveBeenCalledTimes(1)
227+
228+
emitter.removeMiddleware(mockMiddleware)
229+
230+
emitter.emit(mockSignal)
231+
await sleep(0)
232+
expect(mockMiddleware.process).toHaveBeenCalledTimes(1)
233+
})
234+
235+
it('should handle multiple middlewares being added', async () => {
236+
const mockMiddleware2 = {
237+
load: jest.fn(),
238+
process: jest.fn().mockReturnValue(mockSignal),
239+
}
240+
241+
emitter = await new SignalEmitter()
242+
.addMiddleware(mockMiddleware, mockMiddleware2)
243+
.subscribe(mockSubscriber)
244+
.start(mockCtx)
245+
246+
emitter.emit(mockSignal)
247+
248+
await sleep(0)
249+
250+
expect(mockMiddleware.process).toHaveBeenCalledWith(mockSignal)
251+
expect(mockMiddleware2.process).toHaveBeenCalledWith(mockSignal)
252+
expect(mockSubscriber.process).toHaveBeenCalledWith(mockSignal)
253+
})
254+
255+
it('middleware should block the flushing of signals forall .load promises are resolved', async () => {
256+
class MockMiddleware1 implements SignalsMiddleware {
257+
process(signal: Signal): Signal | null {
258+
// @ts-ignore
259+
signal.foo = this.ctx
260+
return signal
261+
}
262+
ctx!: SignalsMiddlewareContext
263+
async load(ctx: SignalsMiddlewareContext): Promise<void> {
264+
await sleep(50)
265+
this.ctx = ctx
266+
}
267+
}
268+
const mockMiddleware1 = new MockMiddleware1()
269+
const processSpy1 = jest.spyOn(mockMiddleware1, 'process')
270+
const loadSpy1 = jest.spyOn(mockMiddleware1, 'load')
271+
272+
const mockMiddleware2 =
273+
new (class MockMiddleware2 extends MockMiddleware1 {})()
274+
275+
const processSpy2 = jest.spyOn(mockMiddleware2, 'process')
276+
const loadSpy2 = jest.spyOn(mockMiddleware2, 'load')
277+
278+
emitter = new SignalEmitter().addMiddleware(
279+
mockMiddleware1,
280+
mockMiddleware2
281+
)
282+
283+
emitter.emit(mockSignal)
284+
285+
void emitter.start(mockCtx)
286+
287+
await sleep(0)
288+
289+
expect(processSpy1).not.toHaveBeenCalled()
290+
expect(processSpy2).not.toHaveBeenCalled()
291+
292+
await sleep(50)
293+
294+
expect(loadSpy1).toHaveBeenCalledTimes(1)
295+
expect(loadSpy2).toHaveBeenCalledTimes(1)
296+
297+
expect(processSpy1).toHaveBeenCalledWith(mockSignal)
298+
expect(processSpy1).toHaveLastReturnedWith(
299+
expect.objectContaining({ foo: mockCtx })
300+
)
301+
expect(processSpy2).toHaveBeenCalledWith(mockSignal)
302+
expect(processSpy2).toHaveLastReturnedWith(
303+
expect.objectContaining({ foo: mockCtx })
304+
)
305+
})
306+
307+
it('subscribers .load methods should NOT block the flushing of signals for other subscribers (only for themselves)', async () => {
308+
class MockSubscriber1 implements SignalsSubscriber {
309+
process(signal: Signal): Signal {
310+
return signal
311+
}
312+
ctx!: SignalsMiddlewareContext
313+
async load(ctx: SignalsMiddlewareContext): Promise<void> {
314+
this.ctx = ctx
315+
}
316+
}
317+
const mockSubscriber1 = new MockSubscriber1()
318+
const processSpy1 = jest.spyOn(mockSubscriber1, 'process')
319+
const loadSpy1 = jest.spyOn(mockSubscriber1, 'load')
320+
321+
class MockSubscriber2 extends MockSubscriber1 {
322+
_resolveLoadForTest!: () => void
323+
async load(): Promise<void> {
324+
return new Promise((resolve) => {
325+
this._resolveLoadForTest = resolve
326+
})
327+
}
328+
}
329+
330+
const mockSubscriber2 = new MockSubscriber2()
331+
const processSpy2 = jest.spyOn(mockSubscriber2, 'process')
332+
const loadSpy2 = jest.spyOn(mockSubscriber2, 'load')
333+
334+
emitter = new SignalEmitter().subscribe(mockSubscriber1, mockSubscriber2)
335+
336+
emitter.emit(mockSignal)
337+
338+
void emitter.start(mockCtx)
339+
340+
await sleep(0)
341+
342+
expect(loadSpy1).toHaveBeenCalledTimes(1)
343+
expect(processSpy1).toHaveBeenCalledWith(mockSignal)
344+
// Subscriber 1 loads immediately and Subscriber 2 , but they shouldn't block eachother
345+
expect(loadSpy2).toHaveBeenCalledTimes(1)
346+
expect(processSpy2).not.toHaveBeenCalled()
347+
348+
// Subscriber 2's load method should be resolved, so it should process the signal
349+
mockSubscriber2._resolveLoadForTest()
350+
await sleep(0)
351+
expect(processSpy2).toHaveBeenCalledTimes(1)
352+
})
353+
})

0 commit comments

Comments
 (0)