Skip to content

Commit c1089a5

Browse files
tanderson-ldTodd Anderson
andauthored
feat: adds polling synchronizer support (#816)
**Requirements** - [x] I have added test coverage for new or changed functionality - [x] I have followed the repository's [pull request submission guidelines](../blob/main/CONTRIBUTING.md#submitting-pull-requests) - [ ] I have validated my changes against all supported platform versions This will be done at a later time on the target branch during integration testing. **Related issues** SDK-858 and SDK-851 **Describe the solution you've provided** Refactors payload processing to be reusable between polling data sources and streaming data sources. Adds OneShotInitializers and PollingSynchronizer. Adds support to contract tests for sdk-test-harness/feat/fdv2 branch. --------- Co-authored-by: Todd Anderson <[email protected]>
1 parent e2c26af commit c1089a5

File tree

15 files changed

+1084
-188
lines changed

15 files changed

+1084
-188
lines changed

contract-tests/sdkClientEntity.js

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ export function makeSdkConfig(options, tag) {
2323

2424
const maybeTime = (seconds) =>
2525
seconds === undefined || seconds === null ? undefined : seconds / 1000;
26+
2627
if (options.streaming) {
2728
cf.streamUri = options.streaming.baseUri;
2829
cf.streamInitialReconnectDelay = maybeTime(options.streaming.initialRetryDelayMs);
@@ -33,7 +34,7 @@ export function makeSdkConfig(options, tag) {
3334
if (options.polling) {
3435
cf.stream = false;
3536
cf.baseUri = options.polling.baseUri;
36-
cf.pollInterface = options.polling.pollIntervalMs / 1000;
37+
cf.pollInterval = options.polling.pollIntervalMs / 1000;
3738
if (options.polling.filter) {
3839
cf.payloadFilterKey = options.polling.filter;
3940
}
@@ -81,6 +82,61 @@ export function makeSdkConfig(options, tag) {
8182
cf.wrapperVersion = options.wrapper.version;
8283
}
8384
}
85+
if (options.dataSystem) {
86+
const dataSourceStreamingOptions = options.dataSystem.synchronizers?.primary?.streaming ?? options.dataSystem.synchronizers?.secondary?.streaming;
87+
const dataSourcePollingOptions = options.dataSystem.synchronizers?.primary?.polling ?? options.dataSystem.synchronizers?.secondary?.polling;
88+
89+
if (dataSourceStreamingOptions) {
90+
cf.streamUri = dataSourceStreamingOptions.baseUri;
91+
cf.streamInitialReconnectDelay = maybeTime(dataSourceStreamingOptions.initialRetryDelayMs);
92+
if (dataSourceStreamingOptions.filter) {
93+
cf.payloadFilterKey = dataSourceStreamingOptions.filter;
94+
}
95+
}
96+
if (dataSourcePollingOptions) {
97+
cf.stream = false;
98+
cf.baseUri = dataSourcePollingOptions.baseUri;
99+
cf.pollInterval = dataSourcePollingOptions.pollIntervalMs / 1000;
100+
if (dataSourcePollingOptions.filter) {
101+
cf.payloadFilterKey = dataSourcePollingOptions.filter;
102+
}
103+
}
104+
105+
let dataSourceOptions;
106+
if (dataSourceStreamingOptions && dataSourcePollingOptions) {
107+
dataSourceOptions = {
108+
type: 'standard',
109+
...(dataSourceStreamingOptions.initialRetryDelayMs != null &&
110+
{ streamInitialReconnectDelay: maybeTime(dataSourceStreamingOptions.initialRetryDelayMs) }),
111+
...(dataSourcePollingOptions.pollIntervalMs != null &&
112+
{ pollInterval: dataSourcePollingOptions.pollIntervalMs }),
113+
}
114+
} else if (dataSourceStreamingOptions) {
115+
dataSourceOptions = {
116+
type: 'streamingOnly',
117+
...(dataSourceStreamingOptions.initialRetryDelayMs != null &&
118+
{ streamInitialReconnectDelay: maybeTime(dataSourceStreamingOptions.initialRetryDelayMs) }),
119+
}
120+
} else if (dataSourcePollingOptions) {
121+
dataSourceOptions = {
122+
type: 'pollingOnly',
123+
...(dataSourcePollingOptions.pollIntervalMs != null &&
124+
{ pollInterval: dataSourcePollingOptions.pollIntervalMs }),
125+
}
126+
} else {
127+
// No data source options were specified
128+
dataSourceOptions = undefined;
129+
}
130+
131+
if (options.dataSystem.payloadFilter) {
132+
cf.payloadFilterKey = options.dataSystem.payloadFilter;
133+
}
134+
135+
cf.dataSystem = {
136+
dataSource: dataSourceOptions,
137+
}
138+
}
139+
84140
return cf;
85141
}
86142

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,11 @@
11
streaming/validation/drop and reconnect if stream event has malformed JSON
22
streaming/validation/drop and reconnect if stream event has well-formed JSON not matching schema
3+
4+
streaming/fdv2/reconnection state management/initializes from polling initializer
5+
streaming/fdv2/reconnection state management/initializes from 2 polling initializers
6+
7+
streaming/fdv2/reconnection state management/saves previously known state
8+
streaming/fdv2/reconnection state management/replaces previously known state
9+
streaming/fdv2/reconnection state management/updates previously known state
10+
streaming/fdv2/ignores model version
11+
streaming/fdv2/can discard partial events on errors

packages/shared/common/__tests__/internal/fdv2/PayloadReader.test.ts renamed to packages/shared/common/__tests__/internal/fdv2/PayloadStreamReader.test.ts

Lines changed: 96 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import { EventListener, EventName, LDLogger } from '../../../src/api';
2-
import { EventStream, Payload, PayloadReader } from '../../../src/internal/fdv2/payloadReader';
2+
import { Payload } from '../../../src/internal/fdv2/payloadProcessor';
3+
import { EventStream, PayloadStreamReader } from '../../../src/internal/fdv2/payloadStreamReader';
4+
35

46
class MockEventStream implements EventStream {
57
private _listeners: Record<EventName, EventListener> = {};
@@ -16,7 +18,7 @@ class MockEventStream implements EventStream {
1618
it('it sets basis to true when intent code is xfer-full', () => {
1719
const mockStream = new MockEventStream();
1820
const receivedPayloads: Payload[] = [];
19-
const readerUnderTest = new PayloadReader(mockStream, {
21+
const readerUnderTest = new PayloadStreamReader(mockStream, {
2022
mockKind: (it) => it, // obj processor that just returns the same obj
2123
});
2224
readerUnderTest.addPayloadListener((it) => {
@@ -38,7 +40,7 @@ it('it sets basis to true when intent code is xfer-full', () => {
3840
it('it sets basis to false when intent code is xfer-changes', () => {
3941
const mockStream = new MockEventStream();
4042
const receivedPayloads: Payload[] = [];
41-
const readerUnderTest = new PayloadReader(mockStream, {
43+
const readerUnderTest = new PayloadStreamReader(mockStream, {
4244
mockKind: (it) => it, // obj processor that just returns the same obj
4345
});
4446
readerUnderTest.addPayloadListener((it) => {
@@ -57,10 +59,71 @@ it('it sets basis to false when intent code is xfer-changes', () => {
5759
expect(receivedPayloads[0].basis).toEqual(false);
5860
});
5961

62+
it('it sets basis to false and emits empty payload when intent code is none', () => {
63+
const mockStream = new MockEventStream();
64+
const receivedPayloads: Payload[] = [];
65+
const readerUnderTest = new PayloadStreamReader(mockStream, {
66+
mockKind: (it) => it, // obj processor that just returns the same obj
67+
});
68+
readerUnderTest.addPayloadListener((it) => {
69+
receivedPayloads.push(it);
70+
});
71+
72+
mockStream.simulateEvent('server-intent', {
73+
data: '{"payloads": [{"code": "none", "id": "mockId", "target": 42}]}',
74+
});
75+
expect(receivedPayloads.length).toEqual(1);
76+
expect(receivedPayloads[0].id).toEqual('mockId');
77+
expect(receivedPayloads[0].version).toEqual(42);
78+
expect(receivedPayloads[0].basis).toEqual(false);
79+
});
80+
81+
it('it handles xfer-full then xfer-changes', () => {
82+
const mockStream = new MockEventStream();
83+
const receivedPayloads: Payload[] = [];
84+
const readerUnderTest = new PayloadStreamReader(mockStream, {
85+
mockKind: (it) => it, // obj processor that just returns the same obj
86+
});
87+
readerUnderTest.addPayloadListener((it) => {
88+
receivedPayloads.push(it);
89+
});
90+
91+
mockStream.simulateEvent('server-intent', {
92+
data: '{"payloads": [{"code": "xfer-full", "id": "mockId"}]}',
93+
});
94+
mockStream.simulateEvent('put-object', {
95+
data: '{"kind": "mockKind", "key": "flagA", "version": 123, "object": {"objectFieldA": "objectValueA"}}',
96+
});
97+
mockStream.simulateEvent('payload-transferred', {
98+
data: '{"state": "mockState", "version": 1}',
99+
});
100+
101+
mockStream.simulateEvent('put-object', {
102+
data: '{"kind": "mockKind", "key": "flagA", "version": 456, "object": {"objectFieldA": "newValue"}}',
103+
});
104+
mockStream.simulateEvent('payload-transferred', {
105+
data: '{"state": "mockState", "version": 1}',
106+
});
107+
expect(receivedPayloads.length).toEqual(2);
108+
expect(receivedPayloads[0].id).toEqual('mockId');
109+
expect(receivedPayloads[0].state).toEqual('mockState');
110+
expect(receivedPayloads[0].basis).toEqual(true);
111+
expect(receivedPayloads[0].updates.length).toEqual(1);
112+
expect(receivedPayloads[0].updates[0].object).toEqual({ objectFieldA: 'objectValueA' });
113+
expect(receivedPayloads[0].updates[0].deleted).toEqual(undefined);
114+
115+
expect(receivedPayloads[1].id).toEqual('mockId');
116+
expect(receivedPayloads[1].state).toEqual('mockState');
117+
expect(receivedPayloads[1].basis).toEqual(false);
118+
expect(receivedPayloads[1].updates.length).toEqual(1);
119+
expect(receivedPayloads[1].updates[0].object).toEqual({ objectFieldA: 'newValue' });
120+
expect(receivedPayloads[1].updates[0].deleted).toEqual(undefined);
121+
});
122+
60123
it('it includes multiple types of updates in payload', () => {
61124
const mockStream = new MockEventStream();
62125
const receivedPayloads: Payload[] = [];
63-
const readerUnderTest = new PayloadReader(mockStream, {
126+
const readerUnderTest = new PayloadStreamReader(mockStream, {
64127
mockKind: (it) => it, // obj processor that just returns the same obj
65128
});
66129
readerUnderTest.addPayloadListener((it) => {
@@ -98,7 +161,7 @@ it('it includes multiple types of updates in payload', () => {
98161
it('it does not include messages thats are not between server-intent and payloader-transferred', () => {
99162
const mockStream = new MockEventStream();
100163
const receivedPayloads: Payload[] = [];
101-
const readerUnderTest = new PayloadReader(mockStream, {
164+
const readerUnderTest = new PayloadStreamReader(mockStream, {
102165
mockKind: (it) => it, // obj processor that just returns the same obj
103166
});
104167
readerUnderTest.addPayloadListener((it) => {
@@ -131,7 +194,7 @@ it('logs prescribed message when goodbye event is encountered', () => {
131194
};
132195
const mockStream = new MockEventStream();
133196
const receivedPayloads: Payload[] = [];
134-
const readerUnderTest = new PayloadReader(
197+
const readerUnderTest = new PayloadStreamReader(
135198
mockStream,
136199
{
137200
mockKind: (it) => it, // obj processor that just returns the same obj
@@ -162,7 +225,7 @@ it('logs prescribed message when error event is encountered', () => {
162225
};
163226
const mockStream = new MockEventStream();
164227
const receivedPayloads: Payload[] = [];
165-
const readerUnderTest = new PayloadReader(
228+
const readerUnderTest = new PayloadStreamReader(
166229
mockStream,
167230
{
168231
mockKind: (it) => it, // obj processor that just returns the same obj
@@ -183,12 +246,15 @@ it('logs prescribed message when error event is encountered', () => {
183246
mockStream.simulateEvent('error', {
184247
data: '{"reason": "Womp womp"}',
185248
});
249+
mockStream.simulateEvent('put-object', {
250+
data: '{"kind": "mockKind", "key": "flagB", "version": 123, "object": {"objectFieldB": "objectValueB"}}',
251+
});
186252
mockStream.simulateEvent('payload-transferred', {
187253
data: '{"state": "mockState", "version": 1}',
188254
});
189-
expect(receivedPayloads.length).toEqual(0);
255+
expect(receivedPayloads.length).toEqual(1);
190256
expect(mockLogger.info).toHaveBeenCalledWith(
191-
'An issue was encountered receiving updates for payload mockId with reason: Womp womp. Automatic retry will occur.',
257+
'An issue was encountered receiving updates for payload mockId with reason: Womp womp.',
192258
);
193259
});
194260

@@ -201,7 +267,7 @@ it('discards partially transferred data when an error is encountered', () => {
201267
};
202268
const mockStream = new MockEventStream();
203269
const receivedPayloads: Payload[] = [];
204-
const readerUnderTest = new PayloadReader(
270+
const readerUnderTest = new PayloadStreamReader(
205271
mockStream,
206272
{
207273
mockKind: (it) => it, // obj processor that just returns the same obj
@@ -222,6 +288,9 @@ it('discards partially transferred data when an error is encountered', () => {
222288
mockStream.simulateEvent('error', {
223289
data: '{"reason": "Womp womp"}',
224290
});
291+
mockStream.simulateEvent('put-object', {
292+
data: '{"kind": "mockKind", "key": "flagB", "version": 123, "object": {"objectFieldB": "objectValueB"}}',
293+
});
225294
mockStream.simulateEvent('payload-transferred', {
226295
data: '{"state": "mockState", "version": 1}',
227296
});
@@ -240,23 +309,29 @@ it('discards partially transferred data when an error is encountered', () => {
240309
mockStream.simulateEvent('payload-transferred', {
241310
data: '{"state": "mockState2", "version": 1}',
242311
});
243-
expect(receivedPayloads.length).toEqual(1);
244-
expect(receivedPayloads[0].id).toEqual('mockId2');
245-
expect(receivedPayloads[0].state).toEqual('mockState2');
312+
expect(receivedPayloads.length).toEqual(2);
313+
expect(receivedPayloads[0].id).toEqual('mockId');
314+
expect(receivedPayloads[0].state).toEqual('mockState');
246315
expect(receivedPayloads[0].basis).toEqual(true);
247-
expect(receivedPayloads[0].updates.length).toEqual(3);
248-
expect(receivedPayloads[0].updates[0].object).toEqual({ objectFieldX: 'objectValueX' });
316+
expect(receivedPayloads[0].updates.length).toEqual(1);
317+
expect(receivedPayloads[0].updates[0].object).toEqual({ objectFieldB: 'objectValueB' });
249318
expect(receivedPayloads[0].updates[0].deleted).toEqual(undefined);
250-
expect(receivedPayloads[0].updates[1].object).toEqual(undefined);
251-
expect(receivedPayloads[0].updates[1].deleted).toEqual(true);
252-
expect(receivedPayloads[0].updates[2].object).toEqual({ objectFieldZ: 'objectValueZ' });
253-
expect(receivedPayloads[0].updates[2].deleted).toEqual(undefined);
319+
expect(receivedPayloads[1].id).toEqual('mockId2');
320+
expect(receivedPayloads[1].state).toEqual('mockState2');
321+
expect(receivedPayloads[1].basis).toEqual(true);
322+
expect(receivedPayloads[1].updates.length).toEqual(3);
323+
expect(receivedPayloads[1].updates[0].object).toEqual({ objectFieldX: 'objectValueX' });
324+
expect(receivedPayloads[1].updates[0].deleted).toEqual(undefined);
325+
expect(receivedPayloads[1].updates[1].object).toEqual(undefined);
326+
expect(receivedPayloads[1].updates[1].deleted).toEqual(true);
327+
expect(receivedPayloads[1].updates[2].object).toEqual({ objectFieldZ: 'objectValueZ' });
328+
expect(receivedPayloads[1].updates[2].deleted).toEqual(undefined);
254329
});
255330

256331
it('silently ignores unrecognized kinds', () => {
257332
const mockStream = new MockEventStream();
258333
const receivedPayloads: Payload[] = [];
259-
const readerUnderTest = new PayloadReader(mockStream, {
334+
const readerUnderTest = new PayloadStreamReader(mockStream, {
260335
mockKind: (it) => it, // obj processor that just returns the same obj
261336
});
262337
readerUnderTest.addPayloadListener((it) => {
@@ -286,7 +361,7 @@ it('silently ignores unrecognized kinds', () => {
286361
it('ignores additional payloads beyond the first payload in the server-intent message', () => {
287362
const mockStream = new MockEventStream();
288363
const receivedPayloads: Payload[] = [];
289-
const readerUnderTest = new PayloadReader(mockStream, {
364+
const readerUnderTest = new PayloadStreamReader(mockStream, {
290365
mockKind: (it) => it, // obj processor that just returns the same obj
291366
});
292367
readerUnderTest.addPayloadListener((it) => {
Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,17 @@
1-
import { Payload, PayloadListener, PayloadReader, Update } from './payloadReader';
1+
import {
2+
FDv2EventsCollection,
3+
Payload,
4+
PayloadListener,
5+
PayloadProcessor,
6+
Update,
7+
} from './payloadProcessor';
8+
import { PayloadStreamReader } from './payloadStreamReader';
29

3-
export { Payload, PayloadListener, PayloadReader, Update };
10+
export {
11+
FDv2EventsCollection,
12+
Payload,
13+
PayloadListener,
14+
PayloadProcessor,
15+
PayloadStreamReader,
16+
Update,
17+
};

0 commit comments

Comments
 (0)