Skip to content

Commit 6904c99

Browse files
committed
feat: hooks up FDv2 configuration and contract tests to client impl
1 parent 5e6600e commit 6904c99

File tree

7 files changed

+343
-83
lines changed

7 files changed

+343
-83
lines changed

contract-tests/sdkClientEntity.js

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ export function makeSdkConfig(options, tag) {
2323

2424
const maybeTime = (seconds) =>
2525
seconds === undefined || seconds === null ? undefined : seconds / 1000;
26+
2627
if (options.streaming) {
2728
cf.streamUri = options.streaming.baseUri;
2829
cf.streamInitialReconnectDelay = maybeTime(options.streaming.initialRetryDelayMs);
@@ -33,7 +34,7 @@ export function makeSdkConfig(options, tag) {
3334
if (options.polling) {
3435
cf.stream = false;
3536
cf.baseUri = options.polling.baseUri;
36-
cf.pollInterface = options.polling.pollIntervalMs / 1000;
37+
cf.pollInterval = options.polling.pollIntervalMs / 1000;
3738
if (options.polling.filter) {
3839
cf.payloadFilterKey = options.polling.filter;
3940
}
@@ -81,6 +82,61 @@ export function makeSdkConfig(options, tag) {
8182
cf.wrapperVersion = options.wrapper.version;
8283
}
8384
}
85+
if (options.dataSystem) {
86+
const dataSourceStreamingOptions = options.dataSystem.synchronizers?.primary?.streaming ?? options.dataSystem.synchronizers?.secondary?.streaming;
87+
const dataSourcePollingOptions = options.dataSystem.synchronizers?.primary?.polling ?? options.dataSystem.synchronizers?.secondary?.polling;
88+
89+
if (dataSourceStreamingOptions) {
90+
cf.streamUri = dataSourceStreamingOptions.baseUri;
91+
cf.streamInitialReconnectDelay = maybeTime(dataSourceStreamingOptions.initialRetryDelayMs);
92+
if (dataSourceStreamingOptions.filter) {
93+
cf.payloadFilterKey = dataSourceStreamingOptions.filter;
94+
}
95+
}
96+
if (dataSourcePollingOptions) {
97+
cf.stream = false;
98+
cf.baseUri = dataSourcePollingOptions.baseUri;
99+
cf.pollInterval = dataSourcePollingOptions.pollIntervalMs / 1000;
100+
if (dataSourcePollingOptions.filter) {
101+
cf.payloadFilterKey = dataSourcePollingOptions.filter;
102+
}
103+
}
104+
105+
let dataSourceOptions;
106+
if (dataSourceStreamingOptions && dataSourcePollingOptions) {
107+
dataSourceOptions = {
108+
type: 'standard',
109+
...(dataSourceStreamingOptions.initialRetryDelayMs != null &&
110+
{ streamInitialReconnectDelay: maybeTime(dataSourceStreamingOptions.initialRetryDelayMs) }),
111+
...(dataSourcePollingOptions.pollIntervalMs != null &&
112+
{ pollInterval: dataSourcePollingOptions.pollIntervalMs }),
113+
}
114+
} else if (dataSourceStreamingOptions) {
115+
dataSourceOptions = {
116+
type: 'streamingOnly',
117+
...(dataSourceStreamingOptions.initialRetryDelayMs != null &&
118+
{ streamInitialReconnectDelay: maybeTime(dataSourceStreamingOptions.initialRetryDelayMs) }),
119+
}
120+
} else if (dataSourcePollingOptions) {
121+
dataSourceOptions = {
122+
type: 'pollingOnly',
123+
...(dataSourcePollingOptions.pollIntervalMs != null &&
124+
{ pollInterval: dataSourcePollingOptions.pollIntervalMs }),
125+
}
126+
} else {
127+
// No data source options were specified
128+
dataSourceOptions = undefined;
129+
}
130+
131+
if (options.dataSystem.payloadFilter) {
132+
cf.payloadFilterKey = options.dataSystem.payloadFilter;
133+
}
134+
135+
cf.dataSystem = {
136+
dataSource: dataSourceOptions,
137+
}
138+
}
139+
84140
return cf;
85141
}
86142

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,16 @@
1+
polling/requests
2+
polling/payload/large payloads
3+
14
streaming/validation/drop and reconnect if stream event has malformed JSON
25
streaming/validation/drop and reconnect if stream event has well-formed JSON not matching schema
6+
7+
wrapper/poll requests/wrapper name and version
8+
9+
streaming/fdv2/reconnection state management/initializes from polling initializer
10+
streaming/fdv2/reconnection state management/initializes from 2 polling initializers
11+
12+
streaming/fdv2/reconnection state management/saves previously known state
13+
streaming/fdv2/reconnection state management/replaces previously known state
14+
streaming/fdv2/reconnection state management/updates previously known state
15+
streaming/fdv2/ignores model version
16+
streaming/fdv2/can discard partial events on errors

packages/shared/common/__tests__/internal/fdv2/PayloadReader.test.ts

Lines changed: 65 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,48 @@ it('it sets basis to false when intent code is xfer-changes', () => {
5757
expect(receivedPayloads[0].basis).toEqual(false);
5858
});
5959

60+
it('it handles xfer-full then xfer-changes', () => {
61+
const mockStream = new MockEventStream();
62+
const receivedPayloads: Payload[] = [];
63+
const readerUnderTest = new PayloadReader(mockStream, {
64+
mockKind: (it) => it, // obj processor that just returns the same obj
65+
});
66+
readerUnderTest.addPayloadListener((it) => {
67+
receivedPayloads.push(it);
68+
});
69+
70+
mockStream.simulateEvent('server-intent', {
71+
data: '{"payloads": [{"code": "xfer-full", "id": "mockId"}]}',
72+
});
73+
mockStream.simulateEvent('put-object', {
74+
data: '{"kind": "mockKind", "key": "flagA", "version": 123, "object": {"objectFieldA": "objectValueA"}}',
75+
});
76+
mockStream.simulateEvent('payload-transferred', {
77+
data: '{"state": "mockState", "version": 1}',
78+
});
79+
80+
mockStream.simulateEvent('put-object', {
81+
data: '{"kind": "mockKind", "key": "flagA", "version": 456, "object": {"objectFieldA": "newValue"}}',
82+
});
83+
mockStream.simulateEvent('payload-transferred', {
84+
data: '{"state": "mockState", "version": 1}',
85+
});
86+
expect(receivedPayloads.length).toEqual(2);
87+
expect(receivedPayloads[0].id).toEqual('mockId');
88+
expect(receivedPayloads[0].state).toEqual('mockState');
89+
expect(receivedPayloads[0].basis).toEqual(true);
90+
expect(receivedPayloads[0].updates.length).toEqual(1);
91+
expect(receivedPayloads[0].updates[0].object).toEqual({ objectFieldA: 'objectValueA' });
92+
expect(receivedPayloads[0].updates[0].deleted).toEqual(undefined);
93+
94+
expect(receivedPayloads[1].id).toEqual('mockId');
95+
expect(receivedPayloads[1].state).toEqual('mockState');
96+
expect(receivedPayloads[1].basis).toEqual(false);
97+
expect(receivedPayloads[1].updates.length).toEqual(1);
98+
expect(receivedPayloads[1].updates[0].object).toEqual({ objectFieldA: 'newValue' });
99+
expect(receivedPayloads[1].updates[0].deleted).toEqual(undefined);
100+
});
101+
60102
it('it includes multiple types of updates in payload', () => {
61103
const mockStream = new MockEventStream();
62104
const receivedPayloads: Payload[] = [];
@@ -183,12 +225,15 @@ it('logs prescribed message when error event is encountered', () => {
183225
mockStream.simulateEvent('error', {
184226
data: '{"reason": "Womp womp"}',
185227
});
228+
mockStream.simulateEvent('put-object', {
229+
data: '{"kind": "mockKind", "key": "flagB", "version": 123, "object": {"objectFieldB": "objectValueB"}}',
230+
});
186231
mockStream.simulateEvent('payload-transferred', {
187232
data: '{"state": "mockState", "version": 1}',
188233
});
189-
expect(receivedPayloads.length).toEqual(0);
234+
expect(receivedPayloads.length).toEqual(1);
190235
expect(mockLogger.info).toHaveBeenCalledWith(
191-
'An issue was encountered receiving updates for payload mockId with reason: Womp womp. Automatic retry will occur.',
236+
'An issue was encountered receiving updates for payload mockId with reason: Womp womp.',
192237
);
193238
});
194239

@@ -222,6 +267,9 @@ it('discards partially transferred data when an error is encountered', () => {
222267
mockStream.simulateEvent('error', {
223268
data: '{"reason": "Womp womp"}',
224269
});
270+
mockStream.simulateEvent('put-object', {
271+
data: '{"kind": "mockKind", "key": "flagB", "version": 123, "object": {"objectFieldB": "objectValueB"}}',
272+
});
225273
mockStream.simulateEvent('payload-transferred', {
226274
data: '{"state": "mockState", "version": 1}',
227275
});
@@ -240,17 +288,23 @@ it('discards partially transferred data when an error is encountered', () => {
240288
mockStream.simulateEvent('payload-transferred', {
241289
data: '{"state": "mockState2", "version": 1}',
242290
});
243-
expect(receivedPayloads.length).toEqual(1);
244-
expect(receivedPayloads[0].id).toEqual('mockId2');
245-
expect(receivedPayloads[0].state).toEqual('mockState2');
291+
expect(receivedPayloads.length).toEqual(2);
292+
expect(receivedPayloads[0].id).toEqual('mockId');
293+
expect(receivedPayloads[0].state).toEqual('mockState');
246294
expect(receivedPayloads[0].basis).toEqual(true);
247-
expect(receivedPayloads[0].updates.length).toEqual(3);
248-
expect(receivedPayloads[0].updates[0].object).toEqual({ objectFieldX: 'objectValueX' });
295+
expect(receivedPayloads[0].updates.length).toEqual(1);
296+
expect(receivedPayloads[0].updates[0].object).toEqual({ objectFieldB: 'objectValueB' });
249297
expect(receivedPayloads[0].updates[0].deleted).toEqual(undefined);
250-
expect(receivedPayloads[0].updates[1].object).toEqual(undefined);
251-
expect(receivedPayloads[0].updates[1].deleted).toEqual(true);
252-
expect(receivedPayloads[0].updates[2].object).toEqual({ objectFieldZ: 'objectValueZ' });
253-
expect(receivedPayloads[0].updates[2].deleted).toEqual(undefined);
298+
expect(receivedPayloads[1].id).toEqual('mockId2');
299+
expect(receivedPayloads[1].state).toEqual('mockState2');
300+
expect(receivedPayloads[1].basis).toEqual(true);
301+
expect(receivedPayloads[1].updates.length).toEqual(3);
302+
expect(receivedPayloads[1].updates[0].object).toEqual({ objectFieldX: 'objectValueX' });
303+
expect(receivedPayloads[1].updates[0].deleted).toEqual(undefined);
304+
expect(receivedPayloads[1].updates[1].object).toEqual(undefined);
305+
expect(receivedPayloads[1].updates[1].deleted).toEqual(true);
306+
expect(receivedPayloads[1].updates[2].object).toEqual({ objectFieldZ: 'objectValueZ' });
307+
expect(receivedPayloads[1].updates[2].deleted).toEqual(undefined);
254308
});
255309

256310
it('silently ignores unrecognized kinds', () => {

packages/shared/common/src/internal/fdv2/payloadReader.ts

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ export class PayloadReader {
4444
private _listeners: PayloadListener[] = [];
4545

4646
private _tempId?: string = undefined;
47-
private _tempBasis?: boolean = undefined;
47+
private _tempBasis: boolean = false;
4848
private _tempUpdates: Update[] = [];
4949

5050
/**
@@ -105,7 +105,7 @@ export class PayloadReader {
105105

106106
private _processServerIntent = (data: ServerIntentData) => {
107107
// clear state in prep for handling data
108-
this._resetState();
108+
this._resetAll();
109109

110110
// if there's no payloads, return
111111
if (!data.payloads.length) {
@@ -133,7 +133,7 @@ export class PayloadReader {
133133
private _processPutObject = (data: PutObject) => {
134134
// if the following properties haven't been provided by now, we should ignore the event
135135
if (
136-
!this._tempId || // server intent hasn't been recieved yet.
136+
!this._tempId || // server intent hasn't been received yet.
137137
!data.kind ||
138138
!data.key ||
139139
!data.version ||
@@ -144,7 +144,7 @@ export class PayloadReader {
144144

145145
const obj = this._processObj(data.kind, data.object);
146146
if (!obj) {
147-
this._logger?.warn(`Unable to prcoess object for kind: '${data.kind}'`);
147+
this._logger?.warn(`Unable to process object for kind: '${data.kind}'`);
148148
// ignore unrecognized kinds
149149
return;
150150
}
@@ -178,10 +178,9 @@ export class PayloadReader {
178178
if (
179179
!this._tempId || // server intent hasn't been recieved yet.
180180
!data.state ||
181-
!data.version ||
182-
this._tempBasis === undefined
181+
!data.version
183182
) {
184-
this._resetState(); // a reset is best defensive action since payload transferred terminates a payload
183+
this._resetAll(); // a reset is best defensive action since payload transferred terminates a payload
185184
return;
186185
}
187186

@@ -194,26 +193,35 @@ export class PayloadReader {
194193
};
195194

196195
this._listeners.forEach((it) => it(payload));
197-
this._resetState();
196+
this._resetAfterEmission();
198197
};
199198

200199
private _processGoodbye = (data: any) => {
201200
this._logger?.info(
202201
`Goodbye was received from the LaunchDarkly connection with reason: ${data.reason}.`,
203202
);
204-
this._resetState();
203+
this._resetAll();
205204
};
206205

207206
private _processError = (data: any) => {
208207
this._logger?.info(
209-
`An issue was encountered receiving updates for payload ${this._tempId} with reason: ${data.reason}. Automatic retry will occur.`,
208+
`An issue was encountered receiving updates for payload ${this._tempId} with reason: ${data.reason}.`,
210209
);
211-
this._resetState();
210+
this._resetAfterError();
212211
};
213212

214-
private _resetState() {
213+
private _resetAfterEmission() {
214+
this._tempBasis = false;
215+
this._tempUpdates = [];
216+
}
217+
218+
private _resetAfterError() {
219+
this._tempUpdates = [];
220+
}
221+
222+
private _resetAll() {
215223
this._tempId = undefined;
216-
this._tempBasis = undefined;
224+
this._tempBasis = false;
217225
this._tempUpdates = [];
218226
}
219227
}

0 commit comments

Comments
 (0)