Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
285fc9f
feat: adds CompositeDataSource for FDv2
tanderson-ld Feb 7, 2025
3e9953b
implementing status based scheduled transitions and backoff support
tanderson-ld Feb 21, 2025
80582bc
Refactoring backoff and transitioning handling logic to handle more e…
tanderson-ld Feb 24, 2025
eeaa3d1
removing accidental source include
tanderson-ld Feb 24, 2025
6d0a08b
Fixing review comments.
tanderson-ld Mar 4, 2025
ea9845d
Addressing review comments
tanderson-ld Mar 14, 2025
33c873e
Fixing cancellation token issue
tanderson-ld Mar 14, 2025
fa634f5
Additional changes after working on contract test integration
tanderson-ld Mar 24, 2025
a3d2489
chore: Adds LDDataSystemOptions for configuring the Data System.
tanderson-ld Mar 4, 2025
d241af3
tests and some restructuring/fixes
tanderson-ld Mar 17, 2025
bb47dde
fixing unit test name
tanderson-ld Mar 18, 2025
5e6600e
Additional changes after contract tests integration
tanderson-ld Mar 24, 2025
6904c99
feat: hooks up FDv2 configuration and contract tests to client impl
tanderson-ld Mar 27, 2025
a0f5398
feat: adds polling synchronizer support
Apr 2, 2025
9b9abf3
wip
Apr 7, 2025
2e3a4fd
Merge branch 'ta/fdv2-temporary-holding' into ta/sdk-851/polling-sync…
Apr 7, 2025
2a0fdd2
Self review comments and fixing intent none handling
Apr 8, 2025
893f09f
Removing deprecations that didn't merge corrrectly
Apr 8, 2025
eebf115
addressing review comments
Apr 10, 2025
87bcec4
Adding unit tests
Apr 10, 2025
80f8d67
Adding one shot initializer tests
Apr 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 57 additions & 1 deletion contract-tests/sdkClientEntity.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export function makeSdkConfig(options, tag) {

const maybeTime = (seconds) =>
seconds === undefined || seconds === null ? undefined : seconds / 1000;

if (options.streaming) {
cf.streamUri = options.streaming.baseUri;
cf.streamInitialReconnectDelay = maybeTime(options.streaming.initialRetryDelayMs);
Expand All @@ -33,7 +34,7 @@ export function makeSdkConfig(options, tag) {
if (options.polling) {
cf.stream = false;
cf.baseUri = options.polling.baseUri;
cf.pollInterface = options.polling.pollIntervalMs / 1000;
cf.pollInterval = options.polling.pollIntervalMs / 1000;
if (options.polling.filter) {
cf.payloadFilterKey = options.polling.filter;
}
Expand Down Expand Up @@ -81,6 +82,61 @@ export function makeSdkConfig(options, tag) {
cf.wrapperVersion = options.wrapper.version;
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reviewers: Hooks up configuration for contract tests.

if (options.dataSystem) {
const dataSourceStreamingOptions = options.dataSystem.synchronizers?.primary?.streaming ?? options.dataSystem.synchronizers?.secondary?.streaming;
const dataSourcePollingOptions = options.dataSystem.synchronizers?.primary?.polling ?? options.dataSystem.synchronizers?.secondary?.polling;

if (dataSourceStreamingOptions) {
cf.streamUri = dataSourceStreamingOptions.baseUri;
cf.streamInitialReconnectDelay = maybeTime(dataSourceStreamingOptions.initialRetryDelayMs);
if (dataSourceStreamingOptions.filter) {
cf.payloadFilterKey = dataSourceStreamingOptions.filter;
}
}
if (dataSourcePollingOptions) {
cf.stream = false;
cf.baseUri = dataSourcePollingOptions.baseUri;
cf.pollInterval = dataSourcePollingOptions.pollIntervalMs / 1000;
if (dataSourcePollingOptions.filter) {
cf.payloadFilterKey = dataSourcePollingOptions.filter;
}
}

let dataSourceOptions;
if (dataSourceStreamingOptions && dataSourcePollingOptions) {
dataSourceOptions = {
type: 'standard',
...(dataSourceStreamingOptions.initialRetryDelayMs != null &&
{ streamInitialReconnectDelay: maybeTime(dataSourceStreamingOptions.initialRetryDelayMs) }),
...(dataSourcePollingOptions.pollIntervalMs != null &&
{ pollInterval: dataSourcePollingOptions.pollIntervalMs }),
}
} else if (dataSourceStreamingOptions) {
dataSourceOptions = {
type: 'streamingOnly',
...(dataSourceStreamingOptions.initialRetryDelayMs != null &&
{ streamInitialReconnectDelay: maybeTime(dataSourceStreamingOptions.initialRetryDelayMs) }),
}
} else if (dataSourcePollingOptions) {
dataSourceOptions = {
type: 'pollingOnly',
...(dataSourcePollingOptions.pollIntervalMs != null &&
{ pollInterval: dataSourcePollingOptions.pollIntervalMs }),
}
} else {
// No data source options were specified
dataSourceOptions = undefined;
}

if (options.dataSystem.payloadFilter) {
cf.payloadFilterKey = options.dataSystem.payloadFilter;
}

cf.dataSystem = {
dataSource: dataSourceOptions,
}
}

return cf;
}

Expand Down
9 changes: 9 additions & 0 deletions contract-tests/testharness-suppressions.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,11 @@
streaming/validation/drop and reconnect if stream event has malformed JSON
streaming/validation/drop and reconnect if stream event has well-formed JSON not matching schema

streaming/fdv2/reconnection state management/initializes from polling initializer
streaming/fdv2/reconnection state management/initializes from 2 polling initializers

streaming/fdv2/reconnection state management/saves previously known state
streaming/fdv2/reconnection state management/replaces previously known state
streaming/fdv2/reconnection state management/updates previously known state
streaming/fdv2/ignores model version
streaming/fdv2/can discard partial events on errors
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These suppressions are the fdv2 cases that aren't passing at moment. The state and model ones will be addressed in a future story.

The streaming/fdv2/reconnection state management/initializes from polling initializer one is trickier. The JS configuration does not support the same level of initializer customization that the test harness and Go SDK support, so those cases are not supported.

Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { EventListener, EventName, LDLogger } from '../../../src/api';
import { EventStream, Payload, PayloadReader } from '../../../src/internal/fdv2/payloadReader';
import { Payload } from '../../../src/internal/fdv2/payloadProcessor';
import { EventStream, PayloadStreamReader } from '../../../src/internal/fdv2/payloadStreamReader';


class MockEventStream implements EventStream {
private _listeners: Record<EventName, EventListener> = {};
Expand All @@ -16,7 +18,7 @@ class MockEventStream implements EventStream {
it('it sets basis to true when intent code is xfer-full', () => {
const mockStream = new MockEventStream();
const receivedPayloads: Payload[] = [];
const readerUnderTest = new PayloadReader(mockStream, {
const readerUnderTest = new PayloadStreamReader(mockStream, {
mockKind: (it) => it, // obj processor that just returns the same obj
});
readerUnderTest.addPayloadListener((it) => {
Expand All @@ -38,7 +40,7 @@ it('it sets basis to true when intent code is xfer-full', () => {
it('it sets basis to false when intent code is xfer-changes', () => {
const mockStream = new MockEventStream();
const receivedPayloads: Payload[] = [];
const readerUnderTest = new PayloadReader(mockStream, {
const readerUnderTest = new PayloadStreamReader(mockStream, {
mockKind: (it) => it, // obj processor that just returns the same obj
});
readerUnderTest.addPayloadListener((it) => {
Expand All @@ -57,10 +59,71 @@ it('it sets basis to false when intent code is xfer-changes', () => {
expect(receivedPayloads[0].basis).toEqual(false);
});

it('it sets basis to false and emits empty payload when intent code is none', () => {
const mockStream = new MockEventStream();
const receivedPayloads: Payload[] = [];
const readerUnderTest = new PayloadStreamReader(mockStream, {
mockKind: (it) => it, // obj processor that just returns the same obj
});
readerUnderTest.addPayloadListener((it) => {
receivedPayloads.push(it);
});

mockStream.simulateEvent('server-intent', {
data: '{"payloads": [{"code": "none", "id": "mockId", "target": 42}]}',
});
expect(receivedPayloads.length).toEqual(1);
expect(receivedPayloads[0].id).toEqual('mockId');
expect(receivedPayloads[0].version).toEqual(42);
expect(receivedPayloads[0].basis).toEqual(false);
});

it('it handles xfer-full then xfer-changes', () => {
const mockStream = new MockEventStream();
const receivedPayloads: Payload[] = [];
const readerUnderTest = new PayloadStreamReader(mockStream, {
mockKind: (it) => it, // obj processor that just returns the same obj
});
readerUnderTest.addPayloadListener((it) => {
receivedPayloads.push(it);
});

mockStream.simulateEvent('server-intent', {
data: '{"payloads": [{"code": "xfer-full", "id": "mockId"}]}',
});
mockStream.simulateEvent('put-object', {
data: '{"kind": "mockKind", "key": "flagA", "version": 123, "object": {"objectFieldA": "objectValueA"}}',
});
mockStream.simulateEvent('payload-transferred', {
data: '{"state": "mockState", "version": 1}',
});

mockStream.simulateEvent('put-object', {
data: '{"kind": "mockKind", "key": "flagA", "version": 456, "object": {"objectFieldA": "newValue"}}',
});
mockStream.simulateEvent('payload-transferred', {
data: '{"state": "mockState", "version": 1}',
});
expect(receivedPayloads.length).toEqual(2);
expect(receivedPayloads[0].id).toEqual('mockId');
expect(receivedPayloads[0].state).toEqual('mockState');
expect(receivedPayloads[0].basis).toEqual(true);
expect(receivedPayloads[0].updates.length).toEqual(1);
expect(receivedPayloads[0].updates[0].object).toEqual({ objectFieldA: 'objectValueA' });
expect(receivedPayloads[0].updates[0].deleted).toEqual(undefined);

expect(receivedPayloads[1].id).toEqual('mockId');
expect(receivedPayloads[1].state).toEqual('mockState');
expect(receivedPayloads[1].basis).toEqual(false);
expect(receivedPayloads[1].updates.length).toEqual(1);
expect(receivedPayloads[1].updates[0].object).toEqual({ objectFieldA: 'newValue' });
expect(receivedPayloads[1].updates[0].deleted).toEqual(undefined);
});

it('it includes multiple types of updates in payload', () => {
const mockStream = new MockEventStream();
const receivedPayloads: Payload[] = [];
const readerUnderTest = new PayloadReader(mockStream, {
const readerUnderTest = new PayloadStreamReader(mockStream, {
mockKind: (it) => it, // obj processor that just returns the same obj
});
readerUnderTest.addPayloadListener((it) => {
Expand Down Expand Up @@ -98,7 +161,7 @@ it('it includes multiple types of updates in payload', () => {
it('it does not include messages thats are not between server-intent and payloader-transferred', () => {
const mockStream = new MockEventStream();
const receivedPayloads: Payload[] = [];
const readerUnderTest = new PayloadReader(mockStream, {
const readerUnderTest = new PayloadStreamReader(mockStream, {
mockKind: (it) => it, // obj processor that just returns the same obj
});
readerUnderTest.addPayloadListener((it) => {
Expand Down Expand Up @@ -131,7 +194,7 @@ it('logs prescribed message when goodbye event is encountered', () => {
};
const mockStream = new MockEventStream();
const receivedPayloads: Payload[] = [];
const readerUnderTest = new PayloadReader(
const readerUnderTest = new PayloadStreamReader(
mockStream,
{
mockKind: (it) => it, // obj processor that just returns the same obj
Expand Down Expand Up @@ -162,7 +225,7 @@ it('logs prescribed message when error event is encountered', () => {
};
const mockStream = new MockEventStream();
const receivedPayloads: Payload[] = [];
const readerUnderTest = new PayloadReader(
const readerUnderTest = new PayloadStreamReader(
mockStream,
{
mockKind: (it) => it, // obj processor that just returns the same obj
Expand All @@ -183,12 +246,15 @@ it('logs prescribed message when error event is encountered', () => {
mockStream.simulateEvent('error', {
data: '{"reason": "Womp womp"}',
});
mockStream.simulateEvent('put-object', {
data: '{"kind": "mockKind", "key": "flagB", "version": 123, "object": {"objectFieldB": "objectValueB"}}',
});
mockStream.simulateEvent('payload-transferred', {
data: '{"state": "mockState", "version": 1}',
});
expect(receivedPayloads.length).toEqual(0);
expect(receivedPayloads.length).toEqual(1);
expect(mockLogger.info).toHaveBeenCalledWith(
'An issue was encountered receiving updates for payload mockId with reason: Womp womp. Automatic retry will occur.',
'An issue was encountered receiving updates for payload mockId with reason: Womp womp.',
);
});

Expand All @@ -201,7 +267,7 @@ it('discards partially transferred data when an error is encountered', () => {
};
const mockStream = new MockEventStream();
const receivedPayloads: Payload[] = [];
const readerUnderTest = new PayloadReader(
const readerUnderTest = new PayloadStreamReader(
mockStream,
{
mockKind: (it) => it, // obj processor that just returns the same obj
Expand All @@ -222,6 +288,9 @@ it('discards partially transferred data when an error is encountered', () => {
mockStream.simulateEvent('error', {
data: '{"reason": "Womp womp"}',
});
mockStream.simulateEvent('put-object', {
data: '{"kind": "mockKind", "key": "flagB", "version": 123, "object": {"objectFieldB": "objectValueB"}}',
});
mockStream.simulateEvent('payload-transferred', {
data: '{"state": "mockState", "version": 1}',
});
Expand All @@ -240,23 +309,29 @@ it('discards partially transferred data when an error is encountered', () => {
mockStream.simulateEvent('payload-transferred', {
data: '{"state": "mockState2", "version": 1}',
});
expect(receivedPayloads.length).toEqual(1);
expect(receivedPayloads[0].id).toEqual('mockId2');
expect(receivedPayloads[0].state).toEqual('mockState2');
expect(receivedPayloads.length).toEqual(2);
expect(receivedPayloads[0].id).toEqual('mockId');
expect(receivedPayloads[0].state).toEqual('mockState');
expect(receivedPayloads[0].basis).toEqual(true);
expect(receivedPayloads[0].updates.length).toEqual(3);
expect(receivedPayloads[0].updates[0].object).toEqual({ objectFieldX: 'objectValueX' });
expect(receivedPayloads[0].updates.length).toEqual(1);
expect(receivedPayloads[0].updates[0].object).toEqual({ objectFieldB: 'objectValueB' });
expect(receivedPayloads[0].updates[0].deleted).toEqual(undefined);
expect(receivedPayloads[0].updates[1].object).toEqual(undefined);
expect(receivedPayloads[0].updates[1].deleted).toEqual(true);
expect(receivedPayloads[0].updates[2].object).toEqual({ objectFieldZ: 'objectValueZ' });
expect(receivedPayloads[0].updates[2].deleted).toEqual(undefined);
expect(receivedPayloads[1].id).toEqual('mockId2');
expect(receivedPayloads[1].state).toEqual('mockState2');
expect(receivedPayloads[1].basis).toEqual(true);
expect(receivedPayloads[1].updates.length).toEqual(3);
expect(receivedPayloads[1].updates[0].object).toEqual({ objectFieldX: 'objectValueX' });
expect(receivedPayloads[1].updates[0].deleted).toEqual(undefined);
expect(receivedPayloads[1].updates[1].object).toEqual(undefined);
expect(receivedPayloads[1].updates[1].deleted).toEqual(true);
expect(receivedPayloads[1].updates[2].object).toEqual({ objectFieldZ: 'objectValueZ' });
expect(receivedPayloads[1].updates[2].deleted).toEqual(undefined);
});

it('silently ignores unrecognized kinds', () => {
const mockStream = new MockEventStream();
const receivedPayloads: Payload[] = [];
const readerUnderTest = new PayloadReader(mockStream, {
const readerUnderTest = new PayloadStreamReader(mockStream, {
mockKind: (it) => it, // obj processor that just returns the same obj
});
readerUnderTest.addPayloadListener((it) => {
Expand Down Expand Up @@ -286,7 +361,7 @@ it('silently ignores unrecognized kinds', () => {
it('ignores additional payloads beyond the first payload in the server-intent message', () => {
const mockStream = new MockEventStream();
const receivedPayloads: Payload[] = [];
const readerUnderTest = new PayloadReader(mockStream, {
const readerUnderTest = new PayloadStreamReader(mockStream, {
mockKind: (it) => it, // obj processor that just returns the same obj
});
readerUnderTest.addPayloadListener((it) => {
Expand Down
11 changes: 9 additions & 2 deletions packages/shared/common/src/internal/fdv2/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
import { Payload, PayloadListener, PayloadReader, Update } from './payloadReader';
import {
EventsSummary,
Payload,
PayloadListener,
PayloadProcessor,
Update,
} from './payloadProcessor';
import { PayloadStreamReader } from './payloadStreamReader';

export { Payload, PayloadListener, PayloadReader, Update };
export { EventsSummary, Payload, PayloadListener, PayloadProcessor, PayloadStreamReader, Update };
Loading