Skip to content

Commit 343cbbf

Browse files
committed
wip
1 parent 730b2d6 commit 343cbbf

File tree

2 files changed

+313
-4
lines changed

2 files changed

+313
-4
lines changed
Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
import {
2+
DataSourceErrorKind,
3+
defaultHeaders,
4+
Info,
5+
internal,
6+
LDLogger,
7+
LDStreamingError,
8+
ProcessStreamResponse,
9+
subsystem,
10+
} from '@launchdarkly/js-sdk-common';
11+
12+
import StreamingProcessorFDv2 from '../../src/data_sources/StreamingProcessorFDv2';
13+
import { createBasicPlatform } from '../createBasicPlatform';
14+
15+
let logger: LDLogger;
16+
17+
const serviceEndpoints = {
18+
events: '',
19+
polling: '',
20+
streaming: 'https://mockstream.ld.com',
21+
diagnosticEventPath: '/diagnostic',
22+
analyticsEventPath: '/bulk',
23+
includeAuthorizationHeader: true,
24+
};
25+
26+
function getBasicConfiguration(inLogger: LDLogger) {
27+
return {
28+
sdkKey: 'testSdkKey',
29+
serviceEndpoints,
30+
logger: inLogger,
31+
};
32+
}
33+
34+
const dateNowString = '2023-08-10';
35+
const sdkKey = 'my-sdk-key';
36+
const events = {
37+
'server-intent': {
38+
data: '{"data": {"payloads": [{"intentCode": "xfer-full", "id": "mockId"}]}}',
39+
},
40+
'put-object': {
41+
data: '{"data": {"kind": "mockKind", "key": "flagA", "version": 123, "object": {"objectFieldA": "objectValueA"}}}',
42+
},
43+
'payload-transferred': {
44+
data: '{"data": {"state": "mockState", "version": 1}}',
45+
},
46+
};
47+
48+
let basicPlatform: any;
49+
50+
beforeEach(() => {
51+
basicPlatform = createBasicPlatform();
52+
logger = {
53+
error: jest.fn(),
54+
warn: jest.fn(),
55+
info: jest.fn(),
56+
debug: jest.fn(),
57+
};
58+
});
59+
60+
const createMockEventSource = (streamUri: string = '', options: any = {}) => ({
61+
streamUri,
62+
options,
63+
onclose: jest.fn(),
64+
addEventListener: jest.fn(),
65+
close: jest.fn(),
66+
});
67+
68+
describe('given a stream processor with mock event source', () => {
69+
let info: Info;
70+
let streamingProcessor: subsystem.LDStreamProcessor;
71+
let diagnosticsManager: internal.DiagnosticsManager;
72+
let listener: internal.PayloadListener;
73+
let mockEventSource: any;
74+
let mockErrorHandler: jest.Mock;
75+
let simulateEvents: (e?: any) => void;
76+
let simulateError: (e: { status: number; message: string }) => boolean;
77+
78+
beforeAll(() => {
79+
jest.useFakeTimers();
80+
jest.setSystemTime(new Date(dateNowString));
81+
});
82+
83+
afterAll(() => {
84+
jest.useRealTimers();
85+
});
86+
87+
beforeEach(() => {
88+
mockErrorHandler = jest.fn();
89+
90+
info = basicPlatform.info;
91+
92+
basicPlatform.requests = {
93+
createEventSource: jest.fn((streamUri: string, options: any) => {
94+
mockEventSource = createMockEventSource(streamUri, options);
95+
return mockEventSource;
96+
}),
97+
} as any;
98+
simulateEvents = (e: any = events) => {
99+
mockEventSource.addEventListener.mock.calls[0][1](e['server-intent']); // server intent listener
100+
mockEventSource.addEventListener.mock.calls[1][1](e['put-object']); // put listener
101+
mockEventSource.addEventListener.mock.calls[3][1](e['payload-transferred']); // payload transferred listener
102+
};
103+
simulateError = (e: { status: number; message: string }): boolean =>
104+
mockEventSource.options.errorFilter(e);
105+
106+
listener = jest.fn();
107+
108+
diagnosticsManager = new internal.DiagnosticsManager(sdkKey, basicPlatform, {});
109+
streamingProcessor = new StreamingProcessorFDv2(
110+
{
111+
basicConfiguration: getBasicConfiguration(logger),
112+
platform: basicPlatform,
113+
},
114+
'/all',
115+
[],
116+
listener,
117+
{
118+
authorization: 'my-sdk-key',
119+
'user-agent': 'TestUserAgent/2.0.2',
120+
'x-launchdarkly-wrapper': 'Rapper/1.2.3',
121+
},
122+
diagnosticsManager,
123+
mockErrorHandler,
124+
);
125+
126+
jest.spyOn(streamingProcessor, 'stop');
127+
streamingProcessor.start();
128+
});
129+
130+
afterEach(() => {
131+
streamingProcessor.close();
132+
jest.resetAllMocks();
133+
});
134+
135+
it('uses expected uri and eventSource init args', () => {
136+
expect(basicPlatform.requests.createEventSource).toBeCalledWith(
137+
`${serviceEndpoints.streaming}/all`,
138+
{
139+
errorFilter: expect.any(Function),
140+
headers: defaultHeaders(sdkKey, info, undefined),
141+
initialRetryDelayMillis: 1000,
142+
readTimeoutMillis: 300000,
143+
retryResetIntervalMillis: 60000,
144+
},
145+
);
146+
});
147+
148+
it('sets streamInitialReconnectDelay correctly', () => {
149+
streamingProcessor = new StreamingProcessorFDv2(
150+
{
151+
basicConfiguration: getBasicConfiguration(logger),
152+
platform: basicPlatform,
153+
},
154+
'/all',
155+
[],
156+
listener,
157+
{
158+
authorization: 'my-sdk-key',
159+
'user-agent': 'TestUserAgent/2.0.2',
160+
'x-launchdarkly-wrapper': 'Rapper/1.2.3',
161+
},
162+
diagnosticsManager,
163+
mockErrorHandler,
164+
22,
165+
);
166+
streamingProcessor.start();
167+
168+
expect(basicPlatform.requests.createEventSource).toHaveBeenLastCalledWith(
169+
`${serviceEndpoints.streaming}/all`,
170+
{
171+
errorFilter: expect.any(Function),
172+
headers: defaultHeaders(sdkKey, info, undefined),
173+
initialRetryDelayMillis: 22000,
174+
readTimeoutMillis: 300000,
175+
retryResetIntervalMillis: 60000,
176+
},
177+
);
178+
});
179+
180+
it('adds listeners', () => {
181+
expect(mockEventSource.addEventListener).toHaveBeenNthCalledWith(
182+
1,
183+
'server-intent',
184+
expect.any(Function),
185+
);
186+
expect(mockEventSource.addEventListener).toHaveBeenNthCalledWith(
187+
2,
188+
'put-object',
189+
expect.any(Function),
190+
);
191+
expect(mockEventSource.addEventListener).toHaveBeenNthCalledWith(
192+
3,
193+
'delete-object',
194+
expect.any(Function),
195+
);
196+
expect(mockEventSource.addEventListener).toHaveBeenNthCalledWith(
197+
4,
198+
'payload-transferred',
199+
expect.any(Function),
200+
);
201+
expect(mockEventSource.addEventListener).toHaveBeenNthCalledWith(
202+
5,
203+
'goodbye',
204+
expect.any(Function),
205+
);
206+
expect(mockEventSource.addEventListener).toHaveBeenNthCalledWith(
207+
6,
208+
'error',
209+
expect.any(Function),
210+
);
211+
});
212+
213+
it('executes payload listener', () => {
214+
simulateEvents();
215+
const patchHandler = mockEventSource.addEventListener.mock.calls[1][1];
216+
patchHandler(events);
217+
218+
expect(listener).toHaveBeenCalled();
219+
});
220+
221+
it('passes error to callback if json data is malformed', async () => {
222+
simulateEvents({
223+
'server-intent': {
224+
data: '{"data": {"payloads": [{"intent INTENTIONAL CORRUPTION MUWAHAHAHA',
225+
},
226+
});
227+
228+
// this._errorHandler?.(
229+
// DataSourceErrorKind.InvalidData,
230+
// 'Malformed JSON data in event stream',
231+
// );
232+
233+
resume here at fixing this test, was updating the expected parameter.
234+
expect(mockErrorHandler).toHaveBeenCalledWith(
235+
1,
236+
new LDStreamingError(DataSourceErrorKind.InvalidData, 'Malformed JSON data in event stream'),
237+
);
238+
});
239+
240+
it('calls error handler if event.data prop is missing', async () => {
241+
simulateEvents({ flags: {} });
242+
expect(listener).toHaveBeenCalled();
243+
expect(mockErrorHandler.mock.lastCall[0].message).toMatch(/unexpected payload/i);
244+
});
245+
246+
it('closes and stops', async () => {
247+
streamingProcessor.close();
248+
249+
expect(streamingProcessor.stop).toBeCalled();
250+
expect(mockEventSource.close).toBeCalled();
251+
// @ts-ignore
252+
expect(streamingProcessor.eventSource).toBeUndefined();
253+
});
254+
255+
it('creates a stream init event', async () => {
256+
const startTime = Date.now();
257+
simulateEvents();
258+
259+
const diagnosticEvent = diagnosticsManager.createStatsEventAndReset(0, 0, 0);
260+
expect(diagnosticEvent.streamInits.length).toEqual(1);
261+
const si = diagnosticEvent.streamInits[0];
262+
expect(si.timestamp).toEqual(startTime);
263+
expect(si.failed).toBeFalsy();
264+
expect(si.durationMillis).toBeGreaterThanOrEqual(0);
265+
});
266+
267+
describe.each([400, 408, 429, 500, 503])('given recoverable http errors', (status) => {
268+
it(`continues retrying after error: ${status}`, () => {
269+
const startTime = Date.now();
270+
const testError = { status, message: 'retry. recoverable.' };
271+
const willRetry = simulateError(testError);
272+
273+
expect(willRetry).toBeTruthy();
274+
expect(mockErrorHandler).not.toBeCalled();
275+
expect(logger.warn).toBeCalledWith(
276+
expect.stringMatching(new RegExp(`${status}.*will retry`)),
277+
);
278+
279+
const diagnosticEvent = diagnosticsManager.createStatsEventAndReset(0, 0, 0);
280+
expect(diagnosticEvent.streamInits.length).toEqual(1);
281+
const si = diagnosticEvent.streamInits[0];
282+
expect(si.timestamp).toEqual(startTime);
283+
expect(si.failed).toBeTruthy();
284+
expect(si.durationMillis).toBeGreaterThanOrEqual(0);
285+
});
286+
});
287+
288+
describe.each([401, 403])('given irrecoverable http errors', (status) => {
289+
it(`stops retrying after error: ${status}`, () => {
290+
const startTime = Date.now();
291+
const testError = { status, message: 'stopping. irrecoverable.' };
292+
const willRetry = simulateError(testError);
293+
294+
expect(willRetry).toBeFalsy();
295+
expect(mockErrorHandler).toBeCalledWith(
296+
new LDStreamingError(DataSourceErrorKind.Unknown, testError.message, testError.status),
297+
);
298+
expect(logger.error).toBeCalledWith(
299+
expect.stringMatching(new RegExp(`${status}.*permanently`)),
300+
);
301+
302+
const diagnosticEvent = diagnosticsManager.createStatsEventAndReset(0, 0, 0);
303+
expect(diagnosticEvent.streamInits.length).toEqual(1);
304+
const si = diagnosticEvent.streamInits[0];
305+
expect(si.timestamp).toEqual(startTime);
306+
expect(si.failed).toBeTruthy();
307+
expect(si.durationMillis).toBeGreaterThanOrEqual(0);
308+
});
309+
});
310+
});

packages/shared/sdk-server/src/data_sources/createStreamListenersFDv2.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import { internal, LDLogger, VoidFunction } from '@launchdarkly/js-sdk-common';
2-
import { Payload } from '@launchdarkly/js-sdk-common/dist/esm/internal';
32

43
import {
54
LDDataSourceUpdates,
@@ -24,12 +23,12 @@ export const createPayloadListener =
2423
logger?: LDLogger,
2524
basisReceived: VoidFunction = () => {},
2625
) =>
27-
(payload: Payload) => {
26+
(payload: internal.Payload) => {
2827
// This conversion from FDv2 updates to the existing types used with DataSourceUpdates should be temporary. Eventually
2928
// DataSourceUpdates will support update(...) taking in the list of updates.
3029
if (payload.basis) {
3130
// convert basis to init param structure
32-
// TODO: remove conversion as part of FDv2 Persistence work
31+
// TODO: SDK-850 - remove conversion as part of FDv2 Persistence work
3332
const converted: LDFeatureStoreDataStorage = {};
3433
payload.updates.forEach((it: internal.Update) => {
3534
const namespace = namespaceForKind(it.kind);
@@ -56,7 +55,7 @@ export const createPayloadListener =
5655
dataSourceUpdates.init(converted, basisReceived);
5756
} else {
5857
// convert data to upsert param
59-
// TODO: remove conversion as part of FDv2 Persistence work
58+
// TODO: SDK-850 - remove conversion as part of FDv2 Persistence work
6059
payload.updates.forEach((it: internal.Update) => {
6160
const converted: LDKeyedFeatureStoreItem = {
6261
key: it.key,

0 commit comments

Comments
 (0)