Skip to content

Commit 7f5c275

Browse files
feat: Adds StreamingProcessor for FDv2 to sdk-server package. (#707)
**Requirements** - [x] I have added test coverage for new or changed functionality - [x] I have followed the repository's [pull request submission guidelines](../blob/main/CONTRIBUTING.md#submitting-pull-requests) - [x] I have validated my changes against all supported platform versions **Related issues** SDK-849 **Describe the solution you've provided** Inserted PayloadReader between EventSource and DataSourceUpdates. Contract test glue code can be found on `ta/sdk-849/fdv2-streaming-datasource-contract-test-glue` --------- Co-authored-by: Ryan Lamb <[email protected]>
1 parent 220b6d6 commit 7f5c275

File tree

11 files changed

+1264
-6
lines changed

11 files changed

+1264
-6
lines changed
Lines changed: 314 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,314 @@
1+
import { EventListener, EventName, LDLogger } from '../../../src/api';
2+
import { EventStream, Payload, PayloadReader } from '../../../src/internal/fdv2/payloadReader';
3+
4+
class MockEventStream implements EventStream {
5+
private _listeners: Record<EventName, EventListener> = {};
6+
7+
addEventListener(eventName: EventName, listener: EventListener): void {
8+
this._listeners[eventName] = listener;
9+
}
10+
11+
simulateEvent(eventName: EventName, event: { data?: string }) {
12+
this._listeners[eventName](event);
13+
}
14+
}
15+
16+
it('it sets basis to true when intent code is xfer-full', () => {
17+
const mockStream = new MockEventStream();
18+
const receivedPayloads: Payload[] = [];
19+
const readerUnderTest = new PayloadReader(mockStream, {
20+
mockKind: (it) => it, // obj processor that just returns the same obj
21+
});
22+
readerUnderTest.addPayloadListener((it) => {
23+
receivedPayloads.push(it);
24+
});
25+
26+
mockStream.simulateEvent('server-intent', {
27+
data: '{"payloads": [{"code": "xfer-full", "id": "mockId"}]}',
28+
});
29+
mockStream.simulateEvent('payload-transferred', {
30+
data: '{"state": "mockState", "version": 1}',
31+
});
32+
expect(receivedPayloads.length).toEqual(1);
33+
expect(receivedPayloads[0].id).toEqual('mockId');
34+
expect(receivedPayloads[0].state).toEqual('mockState');
35+
expect(receivedPayloads[0].basis).toEqual(true);
36+
});
37+
38+
it('it sets basis to false when intent code is xfer-changes', () => {
39+
const mockStream = new MockEventStream();
40+
const receivedPayloads: Payload[] = [];
41+
const readerUnderTest = new PayloadReader(mockStream, {
42+
mockKind: (it) => it, // obj processor that just returns the same obj
43+
});
44+
readerUnderTest.addPayloadListener((it) => {
45+
receivedPayloads.push(it);
46+
});
47+
48+
mockStream.simulateEvent('server-intent', {
49+
data: '{"payloads": [{"code": "xfer-changes", "id": "mockId"}]}',
50+
});
51+
mockStream.simulateEvent('payload-transferred', {
52+
data: '{"state": "mockState", "version": 1}',
53+
});
54+
expect(receivedPayloads.length).toEqual(1);
55+
expect(receivedPayloads[0].id).toEqual('mockId');
56+
expect(receivedPayloads[0].state).toEqual('mockState');
57+
expect(receivedPayloads[0].basis).toEqual(false);
58+
});
59+
60+
it('it includes multiple types of updates in payload', () => {
61+
const mockStream = new MockEventStream();
62+
const receivedPayloads: Payload[] = [];
63+
const readerUnderTest = new PayloadReader(mockStream, {
64+
mockKind: (it) => it, // obj processor that just returns the same obj
65+
});
66+
readerUnderTest.addPayloadListener((it) => {
67+
receivedPayloads.push(it);
68+
});
69+
70+
mockStream.simulateEvent('server-intent', {
71+
data: '{"payloads": [{"code": "xfer-full", "id": "mockId"}]}',
72+
});
73+
mockStream.simulateEvent('put-object', {
74+
data: '{"kind": "mockKind", "key": "flagA", "version": 123, "object": {"objectFieldA": "objectValueA"}}',
75+
});
76+
mockStream.simulateEvent('delete-object', {
77+
data: '{"kind": "mockKind", "key": "flagB", "version": 123}',
78+
});
79+
mockStream.simulateEvent('put-object', {
80+
data: '{"kind": "mockKind", "key": "flagC", "version": 123, "object": {"objectFieldC": "objectValueC"}}',
81+
});
82+
mockStream.simulateEvent('payload-transferred', {
83+
data: '{"state": "mockState", "version": 1}',
84+
});
85+
expect(receivedPayloads.length).toEqual(1);
86+
expect(receivedPayloads[0].id).toEqual('mockId');
87+
expect(receivedPayloads[0].state).toEqual('mockState');
88+
expect(receivedPayloads[0].basis).toEqual(true);
89+
expect(receivedPayloads[0].updates.length).toEqual(3);
90+
expect(receivedPayloads[0].updates[0].object).toEqual({ objectFieldA: 'objectValueA' });
91+
expect(receivedPayloads[0].updates[0].deleted).toEqual(undefined);
92+
expect(receivedPayloads[0].updates[1].object).toEqual(undefined);
93+
expect(receivedPayloads[0].updates[1].deleted).toEqual(true);
94+
expect(receivedPayloads[0].updates[2].object).toEqual({ objectFieldC: 'objectValueC' });
95+
expect(receivedPayloads[0].updates[2].deleted).toEqual(undefined);
96+
});
97+
98+
it('it does not include messages thats are not between server-intent and payloader-transferred', () => {
99+
const mockStream = new MockEventStream();
100+
const receivedPayloads: Payload[] = [];
101+
const readerUnderTest = new PayloadReader(mockStream, {
102+
mockKind: (it) => it, // obj processor that just returns the same obj
103+
});
104+
readerUnderTest.addPayloadListener((it) => {
105+
receivedPayloads.push(it);
106+
});
107+
108+
mockStream.simulateEvent('put-object', {
109+
data: '{"kind": "mockKind", "key": "flagShouldIgnore", "version": 123, "object": {"objectFieldShouldIgnore": "objectValueShouldIgnore"}}',
110+
});
111+
mockStream.simulateEvent('server-intent', {
112+
data: '{"payloads": [{"code": "xfer-full", "id": "mockId"}]}',
113+
});
114+
mockStream.simulateEvent('put-object', {
115+
data: '{"kind": "mockKind", "key": "flagA", "version": 123, "object": {"objectFieldA": "objectValueA"}}',
116+
});
117+
mockStream.simulateEvent('payload-transferred', {
118+
data: '{"state": "mockState", "version": 1}',
119+
});
120+
expect(receivedPayloads.length).toEqual(1);
121+
expect(receivedPayloads[0].updates.length).toEqual(1);
122+
expect(receivedPayloads[0].updates[0].object).toEqual({ objectFieldA: 'objectValueA' });
123+
});
124+
125+
it('logs prescribed message when goodbye event is encountered', () => {
126+
const mockLogger: LDLogger = {
127+
error: jest.fn(),
128+
warn: jest.fn(),
129+
info: jest.fn(),
130+
debug: jest.fn(),
131+
};
132+
const mockStream = new MockEventStream();
133+
const receivedPayloads: Payload[] = [];
134+
const readerUnderTest = new PayloadReader(
135+
mockStream,
136+
{
137+
mockKind: (it) => it, // obj processor that just returns the same obj
138+
},
139+
undefined,
140+
mockLogger,
141+
);
142+
readerUnderTest.addPayloadListener((it) => {
143+
receivedPayloads.push(it);
144+
});
145+
146+
mockStream.simulateEvent('goodbye', {
147+
data: '{"reason": "Bye"}',
148+
});
149+
150+
expect(receivedPayloads.length).toEqual(0);
151+
expect(mockLogger.info).toHaveBeenCalledWith(
152+
'Goodbye was received from the LaunchDarkly connection with reason: Bye.',
153+
);
154+
});
155+
156+
it('logs prescribed message when error event is encountered', () => {
157+
const mockLogger: LDLogger = {
158+
error: jest.fn(),
159+
warn: jest.fn(),
160+
info: jest.fn(),
161+
debug: jest.fn(),
162+
};
163+
const mockStream = new MockEventStream();
164+
const receivedPayloads: Payload[] = [];
165+
const readerUnderTest = new PayloadReader(
166+
mockStream,
167+
{
168+
mockKind: (it) => it, // obj processor that just returns the same obj
169+
},
170+
undefined,
171+
mockLogger,
172+
);
173+
readerUnderTest.addPayloadListener((it) => {
174+
receivedPayloads.push(it);
175+
});
176+
177+
mockStream.simulateEvent('server-intent', {
178+
data: '{"payloads": [{"code": "xfer-full", "id": "mockId"}]}',
179+
});
180+
mockStream.simulateEvent('put-object', {
181+
data: '{"kind": "mockKind", "key": "flagA", "version": 123, "object": {"objectFieldA": "objectValueA"}}',
182+
});
183+
mockStream.simulateEvent('error', {
184+
data: '{"reason": "Womp womp"}',
185+
});
186+
mockStream.simulateEvent('payload-transferred', {
187+
data: '{"state": "mockState", "version": 1}',
188+
});
189+
expect(receivedPayloads.length).toEqual(0);
190+
expect(mockLogger.info).toHaveBeenCalledWith(
191+
'An issue was encountered receiving updates for payload mockId with reason: Womp womp. Automatic retry will occur.',
192+
);
193+
});
194+
195+
it('discards partially transferred data when an error is encountered', () => {
196+
const mockLogger: LDLogger = {
197+
error: jest.fn(),
198+
warn: jest.fn(),
199+
info: jest.fn(),
200+
debug: jest.fn(),
201+
};
202+
const mockStream = new MockEventStream();
203+
const receivedPayloads: Payload[] = [];
204+
const readerUnderTest = new PayloadReader(
205+
mockStream,
206+
{
207+
mockKind: (it) => it, // obj processor that just returns the same obj
208+
},
209+
undefined,
210+
mockLogger,
211+
);
212+
readerUnderTest.addPayloadListener((it) => {
213+
receivedPayloads.push(it);
214+
});
215+
216+
mockStream.simulateEvent('server-intent', {
217+
data: '{"payloads": [{"code": "xfer-full", "id": "mockId"}]}',
218+
});
219+
mockStream.simulateEvent('put-object', {
220+
data: '{"kind": "mockKind", "key": "flagA", "version": 123, "object": {"objectFieldA": "objectValueA"}}',
221+
});
222+
mockStream.simulateEvent('error', {
223+
data: '{"reason": "Womp womp"}',
224+
});
225+
mockStream.simulateEvent('payload-transferred', {
226+
data: '{"state": "mockState", "version": 1}',
227+
});
228+
mockStream.simulateEvent('server-intent', {
229+
data: '{"payloads": [{"code": "xfer-full", "id": "mockId2"}]}',
230+
});
231+
mockStream.simulateEvent('put-object', {
232+
data: '{"kind": "mockKind", "key": "flagX", "version": 123, "object": {"objectFieldX": "objectValueX"}}',
233+
});
234+
mockStream.simulateEvent('delete-object', {
235+
data: '{"kind": "mockKind", "key": "flagY", "version": 123}',
236+
});
237+
mockStream.simulateEvent('put-object', {
238+
data: '{"kind": "mockKind", "key": "flagZ", "version": 123, "object": {"objectFieldZ": "objectValueZ"}}',
239+
});
240+
mockStream.simulateEvent('payload-transferred', {
241+
data: '{"state": "mockState2", "version": 1}',
242+
});
243+
expect(receivedPayloads.length).toEqual(1);
244+
expect(receivedPayloads[0].id).toEqual('mockId2');
245+
expect(receivedPayloads[0].state).toEqual('mockState2');
246+
expect(receivedPayloads[0].basis).toEqual(true);
247+
expect(receivedPayloads[0].updates.length).toEqual(3);
248+
expect(receivedPayloads[0].updates[0].object).toEqual({ objectFieldX: 'objectValueX' });
249+
expect(receivedPayloads[0].updates[0].deleted).toEqual(undefined);
250+
expect(receivedPayloads[0].updates[1].object).toEqual(undefined);
251+
expect(receivedPayloads[0].updates[1].deleted).toEqual(true);
252+
expect(receivedPayloads[0].updates[2].object).toEqual({ objectFieldZ: 'objectValueZ' });
253+
expect(receivedPayloads[0].updates[2].deleted).toEqual(undefined);
254+
});
255+
256+
it('silently ignores unrecognized kinds', () => {
257+
const mockStream = new MockEventStream();
258+
const receivedPayloads: Payload[] = [];
259+
const readerUnderTest = new PayloadReader(mockStream, {
260+
mockKind: (it) => it, // obj processor that just returns the same obj
261+
});
262+
readerUnderTest.addPayloadListener((it) => {
263+
receivedPayloads.push(it);
264+
});
265+
266+
mockStream.simulateEvent('server-intent', {
267+
data: '{"payloads": [{"code": "xfer-full", "id": "mockId"}]}',
268+
});
269+
mockStream.simulateEvent('put-object', {
270+
data: '{"kind": "mockKind", "key": "flagA", "version": 123, "object": {"objectFieldA": "objectValueA"}}',
271+
});
272+
mockStream.simulateEvent('put-object', {
273+
data: '{"kind": "ItsMeYourBrotherUnrecognizedKind", "key": "unrecognized", "version": 123, "object": {"unrecognized": "unrecognized"}}',
274+
});
275+
mockStream.simulateEvent('payload-transferred', {
276+
data: '{"state": "mockState", "version": 1}',
277+
});
278+
expect(receivedPayloads.length).toEqual(1);
279+
expect(receivedPayloads[0].id).toEqual('mockId');
280+
expect(receivedPayloads[0].state).toEqual('mockState');
281+
expect(receivedPayloads[0].basis).toEqual(true);
282+
expect(receivedPayloads[0].updates.length).toEqual(1);
283+
expect(receivedPayloads[0].updates[0].object).toEqual({ objectFieldA: 'objectValueA' });
284+
});
285+
286+
it('ignores additional payloads beyond the first payload in the server-intent message', () => {
287+
const mockStream = new MockEventStream();
288+
const receivedPayloads: Payload[] = [];
289+
const readerUnderTest = new PayloadReader(mockStream, {
290+
mockKind: (it) => it, // obj processor that just returns the same obj
291+
});
292+
readerUnderTest.addPayloadListener((it) => {
293+
receivedPayloads.push(it);
294+
});
295+
296+
mockStream.simulateEvent('server-intent', {
297+
data: '{"payloads": [{"code": "xfer-full", "id": "mockId"},{"code": "IShouldBeIgnored", "id": "IShouldBeIgnored"}]}',
298+
});
299+
mockStream.simulateEvent('put-object', {
300+
data: '{"kind": "mockKind", "key": "flagA", "version": 123, "object": {"objectFieldA": "objectValueA"}}',
301+
});
302+
mockStream.simulateEvent('put-object', {
303+
data: '{"kind": "ItsMeYourBrotherUnrecognizedKind", "key": "unrecognized", "version": 123, "object": {"unrecognized": "unrecognized"}}',
304+
});
305+
mockStream.simulateEvent('payload-transferred', {
306+
data: '{"state": "mockState", "version": 1}',
307+
});
308+
expect(receivedPayloads.length).toEqual(1);
309+
expect(receivedPayloads[0].id).toEqual('mockId');
310+
expect(receivedPayloads[0].state).toEqual('mockState');
311+
expect(receivedPayloads[0].basis).toEqual(true);
312+
expect(receivedPayloads[0].updates.length).toEqual(1);
313+
expect(receivedPayloads[0].updates[0].object).toEqual({ objectFieldA: 'objectValueA' });
314+
});

packages/shared/common/src/api/platform/EventSource.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type { HttpErrorResponse } from './Requests';
22

3-
export type EventName = 'delete' | 'patch' | 'ping' | 'put';
3+
export type EventName = string;
44
export type EventListener = (event?: { data?: any }) => void;
55
export type ProcessStreamResponse = {
66
deserializeData: (data: string) => any;
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
import { Payload, PayloadListener, PayloadReader, Update } from './payloadReader';
2+
3+
export { Payload, PayloadListener, PayloadReader, Update };

0 commit comments

Comments
 (0)