Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 57 additions & 1 deletion contract-tests/sdkClientEntity.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export function makeSdkConfig(options, tag) {

const maybeTime = (seconds) =>
seconds === undefined || seconds === null ? undefined : seconds / 1000;

if (options.streaming) {
cf.streamUri = options.streaming.baseUri;
cf.streamInitialReconnectDelay = maybeTime(options.streaming.initialRetryDelayMs);
Expand All @@ -33,7 +34,7 @@ export function makeSdkConfig(options, tag) {
if (options.polling) {
cf.stream = false;
cf.baseUri = options.polling.baseUri;
cf.pollInterface = options.polling.pollIntervalMs / 1000;
cf.pollInterval = options.polling.pollIntervalMs / 1000;
if (options.polling.filter) {
cf.payloadFilterKey = options.polling.filter;
}
Expand Down Expand Up @@ -81,6 +82,61 @@ export function makeSdkConfig(options, tag) {
cf.wrapperVersion = options.wrapper.version;
}
}
if (options.dataSystem) {
const dataSourceStreamingOptions = options.dataSystem.synchronizers?.primary?.streaming ?? options.dataSystem.synchronizers?.secondary?.streaming;
const dataSourcePollingOptions = options.dataSystem.synchronizers?.primary?.polling ?? options.dataSystem.synchronizers?.secondary?.polling;

if (dataSourceStreamingOptions) {
cf.streamUri = dataSourceStreamingOptions.baseUri;
cf.streamInitialReconnectDelay = maybeTime(dataSourceStreamingOptions.initialRetryDelayMs);
if (dataSourceStreamingOptions.filter) {
cf.payloadFilterKey = dataSourceStreamingOptions.filter;
}
}
if (dataSourcePollingOptions) {
cf.stream = false;
cf.baseUri = dataSourcePollingOptions.baseUri;
cf.pollInterval = dataSourcePollingOptions.pollIntervalMs / 1000;
if (dataSourcePollingOptions.filter) {
cf.payloadFilterKey = dataSourcePollingOptions.filter;
}
}

let dataSourceOptions;
if (dataSourceStreamingOptions && dataSourcePollingOptions) {
dataSourceOptions = {
type: 'standard',
...(dataSourceStreamingOptions.initialRetryDelayMs != null &&
{ streamInitialReconnectDelay: maybeTime(dataSourceStreamingOptions.initialRetryDelayMs) }),
...(dataSourcePollingOptions.pollIntervalMs != null &&
{ pollInterval: dataSourcePollingOptions.pollIntervalMs }),
}
} else if (dataSourceStreamingOptions) {
dataSourceOptions = {
type: 'streamingOnly',
...(dataSourceStreamingOptions.initialRetryDelayMs != null &&
{ streamInitialReconnectDelay: maybeTime(dataSourceStreamingOptions.initialRetryDelayMs) }),
}
} else if (dataSourcePollingOptions) {
dataSourceOptions = {
type: 'pollingOnly',
...(dataSourcePollingOptions.pollIntervalMs != null &&
{ pollInterval: dataSourcePollingOptions.pollIntervalMs }),
}
} else {
// No data source options were specified
dataSourceOptions = undefined;
}

if (options.dataSystem.payloadFilter) {
cf.payloadFilterKey = options.dataSystem.payloadFilter;
}

cf.dataSystem = {
dataSource: dataSourceOptions,
}
}

return cf;
}

Expand Down
14 changes: 14 additions & 0 deletions contract-tests/testharness-suppressions.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,16 @@
polling/requests
polling/payload/large payloads

streaming/validation/drop and reconnect if stream event has malformed JSON
streaming/validation/drop and reconnect if stream event has well-formed JSON not matching schema

wrapper/poll requests/wrapper name and version

streaming/fdv2/reconnection state management/initializes from polling initializer
streaming/fdv2/reconnection state management/initializes from 2 polling initializers

streaming/fdv2/reconnection state management/saves previously known state
streaming/fdv2/reconnection state management/replaces previously known state
streaming/fdv2/reconnection state management/updates previously known state
streaming/fdv2/ignores model version
streaming/fdv2/can discard partial events on errors
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,48 @@ it('it sets basis to false when intent code is xfer-changes', () => {
expect(receivedPayloads[0].basis).toEqual(false);
});

it('it handles xfer-full then xfer-changes', () => {
const mockStream = new MockEventStream();
const receivedPayloads: Payload[] = [];
const readerUnderTest = new PayloadReader(mockStream, {
mockKind: (it) => it, // obj processor that just returns the same obj
});
readerUnderTest.addPayloadListener((it) => {
receivedPayloads.push(it);
});

mockStream.simulateEvent('server-intent', {
data: '{"payloads": [{"code": "xfer-full", "id": "mockId"}]}',
});
mockStream.simulateEvent('put-object', {
data: '{"kind": "mockKind", "key": "flagA", "version": 123, "object": {"objectFieldA": "objectValueA"}}',
});
mockStream.simulateEvent('payload-transferred', {
data: '{"state": "mockState", "version": 1}',
});

mockStream.simulateEvent('put-object', {
data: '{"kind": "mockKind", "key": "flagA", "version": 456, "object": {"objectFieldA": "newValue"}}',
});
mockStream.simulateEvent('payload-transferred', {
data: '{"state": "mockState", "version": 1}',
});
expect(receivedPayloads.length).toEqual(2);
expect(receivedPayloads[0].id).toEqual('mockId');
expect(receivedPayloads[0].state).toEqual('mockState');
expect(receivedPayloads[0].basis).toEqual(true);
expect(receivedPayloads[0].updates.length).toEqual(1);
expect(receivedPayloads[0].updates[0].object).toEqual({ objectFieldA: 'objectValueA' });
expect(receivedPayloads[0].updates[0].deleted).toEqual(undefined);

expect(receivedPayloads[1].id).toEqual('mockId');
expect(receivedPayloads[1].state).toEqual('mockState');
expect(receivedPayloads[1].basis).toEqual(false);
expect(receivedPayloads[1].updates.length).toEqual(1);
expect(receivedPayloads[1].updates[0].object).toEqual({ objectFieldA: 'newValue' });
expect(receivedPayloads[1].updates[0].deleted).toEqual(undefined);
});

it('it includes multiple types of updates in payload', () => {
const mockStream = new MockEventStream();
const receivedPayloads: Payload[] = [];
Expand Down Expand Up @@ -95,7 +137,7 @@ it('it includes multiple types of updates in payload', () => {
expect(receivedPayloads[0].updates[2].deleted).toEqual(undefined);
});

it('it does not include messages thats are not between server-intent and payloader-transferred', () => {
it('it does not include messages thats are not between server-intent and payload-transferred', () => {
const mockStream = new MockEventStream();
const receivedPayloads: Payload[] = [];
const readerUnderTest = new PayloadReader(mockStream, {
Expand Down Expand Up @@ -183,12 +225,15 @@ it('logs prescribed message when error event is encountered', () => {
mockStream.simulateEvent('error', {
data: '{"reason": "Womp womp"}',
});
mockStream.simulateEvent('put-object', {
data: '{"kind": "mockKind", "key": "flagB", "version": 123, "object": {"objectFieldB": "objectValueB"}}',
});
mockStream.simulateEvent('payload-transferred', {
data: '{"state": "mockState", "version": 1}',
});
expect(receivedPayloads.length).toEqual(0);
expect(receivedPayloads.length).toEqual(1);
expect(mockLogger.info).toHaveBeenCalledWith(
'An issue was encountered receiving updates for payload mockId with reason: Womp womp. Automatic retry will occur.',
'An issue was encountered receiving updates for payload mockId with reason: Womp womp.',
);
});

Expand Down Expand Up @@ -222,6 +267,9 @@ it('discards partially transferred data when an error is encountered', () => {
mockStream.simulateEvent('error', {
data: '{"reason": "Womp womp"}',
});
mockStream.simulateEvent('put-object', {
data: '{"kind": "mockKind", "key": "flagB", "version": 123, "object": {"objectFieldB": "objectValueB"}}',
});
mockStream.simulateEvent('payload-transferred', {
data: '{"state": "mockState", "version": 1}',
});
Expand All @@ -240,17 +288,23 @@ it('discards partially transferred data when an error is encountered', () => {
mockStream.simulateEvent('payload-transferred', {
data: '{"state": "mockState2", "version": 1}',
});
expect(receivedPayloads.length).toEqual(1);
expect(receivedPayloads[0].id).toEqual('mockId2');
expect(receivedPayloads[0].state).toEqual('mockState2');
expect(receivedPayloads.length).toEqual(2);
expect(receivedPayloads[0].id).toEqual('mockId');
expect(receivedPayloads[0].state).toEqual('mockState');
expect(receivedPayloads[0].basis).toEqual(true);
expect(receivedPayloads[0].updates.length).toEqual(3);
expect(receivedPayloads[0].updates[0].object).toEqual({ objectFieldX: 'objectValueX' });
expect(receivedPayloads[0].updates.length).toEqual(1);
expect(receivedPayloads[0].updates[0].object).toEqual({ objectFieldB: 'objectValueB' });
expect(receivedPayloads[0].updates[0].deleted).toEqual(undefined);
expect(receivedPayloads[0].updates[1].object).toEqual(undefined);
expect(receivedPayloads[0].updates[1].deleted).toEqual(true);
expect(receivedPayloads[0].updates[2].object).toEqual({ objectFieldZ: 'objectValueZ' });
expect(receivedPayloads[0].updates[2].deleted).toEqual(undefined);
expect(receivedPayloads[1].id).toEqual('mockId2');
expect(receivedPayloads[1].state).toEqual('mockState2');
expect(receivedPayloads[1].basis).toEqual(true);
expect(receivedPayloads[1].updates.length).toEqual(3);
expect(receivedPayloads[1].updates[0].object).toEqual({ objectFieldX: 'objectValueX' });
expect(receivedPayloads[1].updates[0].deleted).toEqual(undefined);
expect(receivedPayloads[1].updates[1].object).toEqual(undefined);
expect(receivedPayloads[1].updates[1].deleted).toEqual(true);
expect(receivedPayloads[1].updates[2].object).toEqual({ objectFieldZ: 'objectValueZ' });
expect(receivedPayloads[1].updates[2].deleted).toEqual(undefined);
});

it('silently ignores unrecognized kinds', () => {
Expand Down
36 changes: 22 additions & 14 deletions packages/shared/common/src/internal/fdv2/payloadReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export class PayloadReader {
private _listeners: PayloadListener[] = [];

private _tempId?: string = undefined;
private _tempBasis?: boolean = undefined;
private _tempBasis: boolean = false;
private _tempUpdates: Update[] = [];

/**
Expand Down Expand Up @@ -105,7 +105,7 @@ export class PayloadReader {

private _processServerIntent = (data: ServerIntentData) => {
// clear state in prep for handling data
this._resetState();
this._resetAll();

// if there's no payloads, return
if (!data.payloads.length) {
Expand Down Expand Up @@ -133,7 +133,7 @@ export class PayloadReader {
private _processPutObject = (data: PutObject) => {
// if the following properties haven't been provided by now, we should ignore the event
if (
!this._tempId || // server intent hasn't been recieved yet.
!this._tempId || // server intent hasn't been received yet.
!data.kind ||
!data.key ||
!data.version ||
Expand All @@ -144,7 +144,7 @@ export class PayloadReader {

const obj = this._processObj(data.kind, data.object);
if (!obj) {
this._logger?.warn(`Unable to prcoess object for kind: '${data.kind}'`);
this._logger?.warn(`Unable to process object for kind: '${data.kind}'`);
// ignore unrecognized kinds
return;
}
Expand Down Expand Up @@ -176,12 +176,11 @@ export class PayloadReader {
private _processPayloadTransferred = (data: PayloadTransferred) => {
// if the following properties haven't been provided by now, we should reset
if (
!this._tempId || // server intent hasn't been recieved yet.
!this._tempId || // server intent hasn't been received yet.
!data.state ||
!data.version ||
this._tempBasis === undefined
!data.version
) {
this._resetState(); // a reset is best defensive action since payload transferred terminates a payload
this._resetAll(); // a reset is best defensive action since payload transferred terminates a payload
return;
}

Expand All @@ -194,26 +193,35 @@ export class PayloadReader {
};

this._listeners.forEach((it) => it(payload));
this._resetState();
this._resetAfterEmission();
};

private _processGoodbye = (data: any) => {
this._logger?.info(
`Goodbye was received from the LaunchDarkly connection with reason: ${data.reason}.`,
);
this._resetState();
this._resetAll();
};

private _processError = (data: any) => {
this._logger?.info(
`An issue was encountered receiving updates for payload ${this._tempId} with reason: ${data.reason}. Automatic retry will occur.`,
`An issue was encountered receiving updates for payload ${this._tempId} with reason: ${data.reason}.`,
);
this._resetState();
this._resetAfterError();
};

private _resetState() {
private _resetAfterEmission() {
this._tempBasis = false;
this._tempUpdates = [];
}

private _resetAfterError() {
this._tempUpdates = [];
}

private _resetAll() {
this._tempId = undefined;
this._tempBasis = undefined;
this._tempBasis = false;
this._tempUpdates = [];
}
}
Loading