diff --git a/contract-tests/sdkClientEntity.js b/contract-tests/sdkClientEntity.js index 7301ffacfa..fcb51ce9fe 100644 --- a/contract-tests/sdkClientEntity.js +++ b/contract-tests/sdkClientEntity.js @@ -23,6 +23,7 @@ export function makeSdkConfig(options, tag) { const maybeTime = (seconds) => seconds === undefined || seconds === null ? undefined : seconds / 1000; + if (options.streaming) { cf.streamUri = options.streaming.baseUri; cf.streamInitialReconnectDelay = maybeTime(options.streaming.initialRetryDelayMs); @@ -33,7 +34,7 @@ export function makeSdkConfig(options, tag) { if (options.polling) { cf.stream = false; cf.baseUri = options.polling.baseUri; - cf.pollInterface = options.polling.pollIntervalMs / 1000; + cf.pollInterval = options.polling.pollIntervalMs / 1000; if (options.polling.filter) { cf.payloadFilterKey = options.polling.filter; } @@ -81,6 +82,61 @@ export function makeSdkConfig(options, tag) { cf.wrapperVersion = options.wrapper.version; } } + if (options.dataSystem) { + const dataSourceStreamingOptions = options.dataSystem.synchronizers?.primary?.streaming ?? options.dataSystem.synchronizers?.secondary?.streaming; + const dataSourcePollingOptions = options.dataSystem.synchronizers?.primary?.polling ?? options.dataSystem.synchronizers?.secondary?.polling; + + if (dataSourceStreamingOptions) { + cf.streamUri = dataSourceStreamingOptions.baseUri; + cf.streamInitialReconnectDelay = maybeTime(dataSourceStreamingOptions.initialRetryDelayMs); + if (dataSourceStreamingOptions.filter) { + cf.payloadFilterKey = dataSourceStreamingOptions.filter; + } + } + if (dataSourcePollingOptions) { + cf.stream = false; + cf.baseUri = dataSourcePollingOptions.baseUri; + cf.pollInterval = dataSourcePollingOptions.pollIntervalMs / 1000; + if (dataSourcePollingOptions.filter) { + cf.payloadFilterKey = dataSourcePollingOptions.filter; + } + } + + let dataSourceOptions; + if (dataSourceStreamingOptions && dataSourcePollingOptions) { + dataSourceOptions = { + type: 'standard', + ...(dataSourceStreamingOptions.initialRetryDelayMs != null && + { streamInitialReconnectDelay: maybeTime(dataSourceStreamingOptions.initialRetryDelayMs) }), + ...(dataSourcePollingOptions.pollIntervalMs != null && + { pollInterval: dataSourcePollingOptions.pollIntervalMs }), + } + } else if (dataSourceStreamingOptions) { + dataSourceOptions = { + type: 'streamingOnly', + ...(dataSourceStreamingOptions.initialRetryDelayMs != null && + { streamInitialReconnectDelay: maybeTime(dataSourceStreamingOptions.initialRetryDelayMs) }), + } + } else if (dataSourcePollingOptions) { + dataSourceOptions = { + type: 'pollingOnly', + ...(dataSourcePollingOptions.pollIntervalMs != null && + { pollInterval: dataSourcePollingOptions.pollIntervalMs }), + } + } else { + // No data source options were specified + dataSourceOptions = undefined; + } + + if (options.dataSystem.payloadFilter) { + cf.payloadFilterKey = options.dataSystem.payloadFilter; + } + + cf.dataSystem = { + dataSource: dataSourceOptions, + } + } + return cf; } diff --git a/contract-tests/testharness-suppressions.txt b/contract-tests/testharness-suppressions.txt index 8a29da94d7..c7c8985d79 100644 --- a/contract-tests/testharness-suppressions.txt +++ b/contract-tests/testharness-suppressions.txt @@ -1,2 +1,11 @@ streaming/validation/drop and reconnect if stream event has malformed JSON streaming/validation/drop and reconnect if stream event has well-formed JSON not matching schema + +streaming/fdv2/reconnection state management/initializes from polling initializer +streaming/fdv2/reconnection state management/initializes from 2 polling initializers + +streaming/fdv2/reconnection state management/saves previously known state +streaming/fdv2/reconnection state management/replaces previously known state +streaming/fdv2/reconnection state management/updates previously known state +streaming/fdv2/ignores model version +streaming/fdv2/can discard partial events on errors \ No newline at end of file diff --git a/packages/shared/common/__tests__/internal/fdv2/PayloadReader.test.ts b/packages/shared/common/__tests__/internal/fdv2/PayloadStreamReader.test.ts similarity index 71% rename from packages/shared/common/__tests__/internal/fdv2/PayloadReader.test.ts rename to packages/shared/common/__tests__/internal/fdv2/PayloadStreamReader.test.ts index 2212062121..ba7a951292 100644 --- a/packages/shared/common/__tests__/internal/fdv2/PayloadReader.test.ts +++ b/packages/shared/common/__tests__/internal/fdv2/PayloadStreamReader.test.ts @@ -1,5 +1,7 @@ import { EventListener, EventName, LDLogger } from '../../../src/api'; -import { EventStream, Payload, PayloadReader } from '../../../src/internal/fdv2/payloadReader'; +import { Payload } from '../../../src/internal/fdv2/payloadProcessor'; +import { EventStream, PayloadStreamReader } from '../../../src/internal/fdv2/payloadStreamReader'; + class MockEventStream implements EventStream { private _listeners: Record = {}; @@ -16,7 +18,7 @@ class MockEventStream implements EventStream { it('it sets basis to true when intent code is xfer-full', () => { const mockStream = new MockEventStream(); const receivedPayloads: Payload[] = []; - const readerUnderTest = new PayloadReader(mockStream, { + const readerUnderTest = new PayloadStreamReader(mockStream, { mockKind: (it) => it, // obj processor that just returns the same obj }); readerUnderTest.addPayloadListener((it) => { @@ -38,7 +40,7 @@ it('it sets basis to true when intent code is xfer-full', () => { it('it sets basis to false when intent code is xfer-changes', () => { const mockStream = new MockEventStream(); const receivedPayloads: Payload[] = []; - const readerUnderTest = new PayloadReader(mockStream, { + const readerUnderTest = new PayloadStreamReader(mockStream, { mockKind: (it) => it, // obj processor that just returns the same obj }); readerUnderTest.addPayloadListener((it) => { @@ -57,10 +59,71 @@ it('it sets basis to false when intent code is xfer-changes', () => { expect(receivedPayloads[0].basis).toEqual(false); }); +it('it sets basis to false and emits empty payload when intent code is none', () => { + const mockStream = new MockEventStream(); + const receivedPayloads: Payload[] = []; + const readerUnderTest = new PayloadStreamReader(mockStream, { + mockKind: (it) => it, // obj processor that just returns the same obj + }); + readerUnderTest.addPayloadListener((it) => { + receivedPayloads.push(it); + }); + + mockStream.simulateEvent('server-intent', { + data: '{"payloads": [{"code": "none", "id": "mockId", "target": 42}]}', + }); + expect(receivedPayloads.length).toEqual(1); + expect(receivedPayloads[0].id).toEqual('mockId'); + expect(receivedPayloads[0].version).toEqual(42); + expect(receivedPayloads[0].basis).toEqual(false); +}); + +it('it handles xfer-full then xfer-changes', () => { + const mockStream = new MockEventStream(); + const receivedPayloads: Payload[] = []; + const readerUnderTest = new PayloadStreamReader(mockStream, { + mockKind: (it) => it, // obj processor that just returns the same obj + }); + readerUnderTest.addPayloadListener((it) => { + receivedPayloads.push(it); + }); + + mockStream.simulateEvent('server-intent', { + data: '{"payloads": [{"code": "xfer-full", "id": "mockId"}]}', + }); + mockStream.simulateEvent('put-object', { + data: '{"kind": "mockKind", "key": "flagA", "version": 123, "object": {"objectFieldA": "objectValueA"}}', + }); + mockStream.simulateEvent('payload-transferred', { + data: '{"state": "mockState", "version": 1}', + }); + + mockStream.simulateEvent('put-object', { + data: '{"kind": "mockKind", "key": "flagA", "version": 456, "object": {"objectFieldA": "newValue"}}', + }); + mockStream.simulateEvent('payload-transferred', { + data: '{"state": "mockState", "version": 1}', + }); + expect(receivedPayloads.length).toEqual(2); + expect(receivedPayloads[0].id).toEqual('mockId'); + expect(receivedPayloads[0].state).toEqual('mockState'); + expect(receivedPayloads[0].basis).toEqual(true); + expect(receivedPayloads[0].updates.length).toEqual(1); + expect(receivedPayloads[0].updates[0].object).toEqual({ objectFieldA: 'objectValueA' }); + expect(receivedPayloads[0].updates[0].deleted).toEqual(undefined); + + expect(receivedPayloads[1].id).toEqual('mockId'); + expect(receivedPayloads[1].state).toEqual('mockState'); + expect(receivedPayloads[1].basis).toEqual(false); + expect(receivedPayloads[1].updates.length).toEqual(1); + expect(receivedPayloads[1].updates[0].object).toEqual({ objectFieldA: 'newValue' }); + expect(receivedPayloads[1].updates[0].deleted).toEqual(undefined); +}); + it('it includes multiple types of updates in payload', () => { const mockStream = new MockEventStream(); const receivedPayloads: Payload[] = []; - const readerUnderTest = new PayloadReader(mockStream, { + const readerUnderTest = new PayloadStreamReader(mockStream, { mockKind: (it) => it, // obj processor that just returns the same obj }); readerUnderTest.addPayloadListener((it) => { @@ -98,7 +161,7 @@ it('it includes multiple types of updates in payload', () => { it('it does not include messages thats are not between server-intent and payloader-transferred', () => { const mockStream = new MockEventStream(); const receivedPayloads: Payload[] = []; - const readerUnderTest = new PayloadReader(mockStream, { + const readerUnderTest = new PayloadStreamReader(mockStream, { mockKind: (it) => it, // obj processor that just returns the same obj }); readerUnderTest.addPayloadListener((it) => { @@ -131,7 +194,7 @@ it('logs prescribed message when goodbye event is encountered', () => { }; const mockStream = new MockEventStream(); const receivedPayloads: Payload[] = []; - const readerUnderTest = new PayloadReader( + const readerUnderTest = new PayloadStreamReader( mockStream, { mockKind: (it) => it, // obj processor that just returns the same obj @@ -162,7 +225,7 @@ it('logs prescribed message when error event is encountered', () => { }; const mockStream = new MockEventStream(); const receivedPayloads: Payload[] = []; - const readerUnderTest = new PayloadReader( + const readerUnderTest = new PayloadStreamReader( mockStream, { mockKind: (it) => it, // obj processor that just returns the same obj @@ -183,12 +246,15 @@ it('logs prescribed message when error event is encountered', () => { mockStream.simulateEvent('error', { data: '{"reason": "Womp womp"}', }); + mockStream.simulateEvent('put-object', { + data: '{"kind": "mockKind", "key": "flagB", "version": 123, "object": {"objectFieldB": "objectValueB"}}', + }); mockStream.simulateEvent('payload-transferred', { data: '{"state": "mockState", "version": 1}', }); - expect(receivedPayloads.length).toEqual(0); + expect(receivedPayloads.length).toEqual(1); expect(mockLogger.info).toHaveBeenCalledWith( - 'An issue was encountered receiving updates for payload mockId with reason: Womp womp. Automatic retry will occur.', + 'An issue was encountered receiving updates for payload mockId with reason: Womp womp.', ); }); @@ -201,7 +267,7 @@ it('discards partially transferred data when an error is encountered', () => { }; const mockStream = new MockEventStream(); const receivedPayloads: Payload[] = []; - const readerUnderTest = new PayloadReader( + const readerUnderTest = new PayloadStreamReader( mockStream, { mockKind: (it) => it, // obj processor that just returns the same obj @@ -222,6 +288,9 @@ it('discards partially transferred data when an error is encountered', () => { mockStream.simulateEvent('error', { data: '{"reason": "Womp womp"}', }); + mockStream.simulateEvent('put-object', { + data: '{"kind": "mockKind", "key": "flagB", "version": 123, "object": {"objectFieldB": "objectValueB"}}', + }); mockStream.simulateEvent('payload-transferred', { data: '{"state": "mockState", "version": 1}', }); @@ -240,23 +309,29 @@ it('discards partially transferred data when an error is encountered', () => { mockStream.simulateEvent('payload-transferred', { data: '{"state": "mockState2", "version": 1}', }); - expect(receivedPayloads.length).toEqual(1); - expect(receivedPayloads[0].id).toEqual('mockId2'); - expect(receivedPayloads[0].state).toEqual('mockState2'); + expect(receivedPayloads.length).toEqual(2); + expect(receivedPayloads[0].id).toEqual('mockId'); + expect(receivedPayloads[0].state).toEqual('mockState'); expect(receivedPayloads[0].basis).toEqual(true); - expect(receivedPayloads[0].updates.length).toEqual(3); - expect(receivedPayloads[0].updates[0].object).toEqual({ objectFieldX: 'objectValueX' }); + expect(receivedPayloads[0].updates.length).toEqual(1); + expect(receivedPayloads[0].updates[0].object).toEqual({ objectFieldB: 'objectValueB' }); expect(receivedPayloads[0].updates[0].deleted).toEqual(undefined); - expect(receivedPayloads[0].updates[1].object).toEqual(undefined); - expect(receivedPayloads[0].updates[1].deleted).toEqual(true); - expect(receivedPayloads[0].updates[2].object).toEqual({ objectFieldZ: 'objectValueZ' }); - expect(receivedPayloads[0].updates[2].deleted).toEqual(undefined); + expect(receivedPayloads[1].id).toEqual('mockId2'); + expect(receivedPayloads[1].state).toEqual('mockState2'); + expect(receivedPayloads[1].basis).toEqual(true); + expect(receivedPayloads[1].updates.length).toEqual(3); + expect(receivedPayloads[1].updates[0].object).toEqual({ objectFieldX: 'objectValueX' }); + expect(receivedPayloads[1].updates[0].deleted).toEqual(undefined); + expect(receivedPayloads[1].updates[1].object).toEqual(undefined); + expect(receivedPayloads[1].updates[1].deleted).toEqual(true); + expect(receivedPayloads[1].updates[2].object).toEqual({ objectFieldZ: 'objectValueZ' }); + expect(receivedPayloads[1].updates[2].deleted).toEqual(undefined); }); it('silently ignores unrecognized kinds', () => { const mockStream = new MockEventStream(); const receivedPayloads: Payload[] = []; - const readerUnderTest = new PayloadReader(mockStream, { + const readerUnderTest = new PayloadStreamReader(mockStream, { mockKind: (it) => it, // obj processor that just returns the same obj }); readerUnderTest.addPayloadListener((it) => { @@ -286,7 +361,7 @@ it('silently ignores unrecognized kinds', () => { it('ignores additional payloads beyond the first payload in the server-intent message', () => { const mockStream = new MockEventStream(); const receivedPayloads: Payload[] = []; - const readerUnderTest = new PayloadReader(mockStream, { + const readerUnderTest = new PayloadStreamReader(mockStream, { mockKind: (it) => it, // obj processor that just returns the same obj }); readerUnderTest.addPayloadListener((it) => { diff --git a/packages/shared/common/src/internal/fdv2/index.ts b/packages/shared/common/src/internal/fdv2/index.ts index 4c4a88773e..b07537ddd9 100644 --- a/packages/shared/common/src/internal/fdv2/index.ts +++ b/packages/shared/common/src/internal/fdv2/index.ts @@ -1,3 +1,17 @@ -import { Payload, PayloadListener, PayloadReader, Update } from './payloadReader'; +import { + FDv2EventsCollection, + Payload, + PayloadListener, + PayloadProcessor, + Update, +} from './payloadProcessor'; +import { PayloadStreamReader } from './payloadStreamReader'; -export { Payload, PayloadListener, PayloadReader, Update }; +export { + FDv2EventsCollection, + Payload, + PayloadListener, + PayloadProcessor, + PayloadStreamReader, + Update, +}; diff --git a/packages/shared/common/src/internal/fdv2/payloadReader.ts b/packages/shared/common/src/internal/fdv2/payloadProcessor.ts similarity index 58% rename from packages/shared/common/src/internal/fdv2/payloadReader.ts rename to packages/shared/common/src/internal/fdv2/payloadProcessor.ts index 68b1320fc3..0273ccacdb 100644 --- a/packages/shared/common/src/internal/fdv2/payloadReader.ts +++ b/packages/shared/common/src/internal/fdv2/payloadProcessor.ts @@ -1,12 +1,13 @@ /* eslint-disable no-underscore-dangle */ -import { EventListener, EventName, LDLogger } from '../../api'; +import { LDLogger } from '../../api'; import { DataSourceErrorKind } from '../../datasource'; -import { DeleteObject, PayloadTransferred, PutObject, ServerIntentData } from './proto'; - -// Facade interface to contain only ability to add event listeners -export interface EventStream { - addEventListener(type: EventName, listener: EventListener): void; -} +import { + DeleteObject, + PayloadIntent, + PayloadTransferred, + PutObject, + ServerIntentData, +} from './proto'; // Used to define object processing between deserialization and payload listener invocation. This can be // used provide object sanitization logic. @@ -14,6 +15,17 @@ export interface ObjProcessors { [kind: string]: (object: any) => any; } +// Represents a collection of events (one case where this is seen is in the polling response) +export interface FDv2EventsCollection { + events: FDv2Event[]; +} + +// Represents a single event +export interface FDv2Event { + event: string; + data: any; +} + // Represents information for one keyed object. export interface Update { kind: string; @@ -28,7 +40,7 @@ export interface Update { export interface Payload { id: string; version: number; - state: string; + state?: string; basis: boolean; updates: Update[]; } @@ -36,38 +48,29 @@ export interface Payload { export type PayloadListener = (payload: Payload) => void; /** - * A FDv2 PayloadReader can be used to parse payloads from a stream of FDv2 events. It will send payloads + * A FDv2 PayloadProcessor can be used to parse payloads from a stream of FDv2 events. It will send payloads * to the PayloadListeners as the payloads are received. Invalid series of events may be dropped silently, - * but the payload reader will continue to operate. + * but the payload processor will continue to operate. */ -export class PayloadReader { +export class PayloadProcessor { private _listeners: PayloadListener[] = []; private _tempId?: string = undefined; - private _tempBasis?: boolean = undefined; + private _tempBasis: boolean = false; private _tempUpdates: Update[] = []; /** - * Creates a PayloadReader + * Creates a PayloadProcessor * - * @param eventStream event stream of FDv2 events * @param _objProcessors defines object processors for each object kind. - * @param _errorHandler that will be called with errors as they are encountered + * @param _errorHandler that will be called with parsing errors as they are encountered * @param _logger for logging */ constructor( - eventStream: EventStream, private readonly _objProcessors: ObjProcessors, private readonly _errorHandler?: (errorKind: DataSourceErrorKind, message: string) => void, private readonly _logger?: LDLogger, - ) { - this._attachHandler(eventStream, 'server-intent', this._processServerIntent); - this._attachHandler(eventStream, 'put-object', this._processPutObject); - this._attachHandler(eventStream, 'delete-object', this._processDeleteObject); - this._attachHandler(eventStream, 'payload-transferred', this._processPayloadTransferred); - this._attachHandler(eventStream, 'goodbye', this._processGoodbye); - this._attachHandler(eventStream, 'error', this._processError); - } + ) {} addPayloadListener(listener: PayloadListener) { this._listeners.push(listener); @@ -80,21 +83,41 @@ export class PayloadReader { } } - private _attachHandler(stream: EventStream, eventName: string, processor: (obj: any) => void) { - stream.addEventListener(eventName, async (event?: { data?: string }) => { - if (event?.data) { - this._logger?.debug(`Received ${eventName} event. Data is ${event.data}`); - try { - processor(JSON.parse(event.data)); - } catch { - this._logger?.error( - `Stream received data that was unable to be processed in "${eventName}" message`, - ); - this._logger?.debug(`Data follows: ${event.data}`); - this._errorHandler?.(DataSourceErrorKind.InvalidData, 'Malformed data in event stream'); + /** + * Gives the {@link PayloadProcessor} a series of events that it will statefully, incrementally process. + * This may lead to listeners being invoked as necessary. + * @param events to be processed (can be a single element) + */ + processEvents(events: FDv2Event[]) { + events.forEach((event) => { + switch (event.event) { + case 'server-intent': { + this._processServerIntent(event.data); + break; + } + case 'put-object': { + this._processPutObject(event.data); + break; + } + case 'delete-object': { + this._processDeleteObject(event.data); + break; + } + case 'payload-transferred': { + this._processPayloadTransferred(event.data); + break; + } + case 'goodbye': { + this._processGoodbye(event.data); + break; + } + case 'error': { + this._processError(event.data); + break; + } + default: { + // no-op, unrecognized } - } else { - this._errorHandler?.(DataSourceErrorKind.Unknown, 'Unexpected message from event stream'); } }); } @@ -105,7 +128,7 @@ export class PayloadReader { private _processServerIntent = (data: ServerIntentData) => { // clear state in prep for handling data - this._resetState(); + this._resetAll(); // if there's no payloads, return if (!data.payloads.length) { @@ -119,8 +142,11 @@ export class PayloadReader { this._tempBasis = true; break; case 'xfer-changes': + this._tempBasis = false; + break; case 'none': this._tempBasis = false; + this._processIntentNone(payload); break; default: // unrecognized intent code, return @@ -133,7 +159,7 @@ export class PayloadReader { private _processPutObject = (data: PutObject) => { // if the following properties haven't been provided by now, we should ignore the event if ( - !this._tempId || // server intent hasn't been recieved yet. + !this._tempId || // server intent hasn't been received yet. !data.kind || !data.key || !data.version || @@ -144,7 +170,7 @@ export class PayloadReader { const obj = this._processObj(data.kind, data.object); if (!obj) { - this._logger?.warn(`Unable to prcoess object for kind: '${data.kind}'`); + this._logger?.warn(`Unable to process object for kind: '${data.kind}'`); // ignore unrecognized kinds return; } @@ -173,15 +199,32 @@ export class PayloadReader { }); }; + private _processIntentNone = (intent: PayloadIntent) => { + // if the following properties aren't present ignore the event + if (!intent.id || !intent.target) { + return; + } + + const payload: Payload = { + id: intent.id, + version: intent.target, + basis: false, // intent none is always not a basis + updates: [], // payload with no updates to hide the intent none concept from the consumer + // note: state is absent here as that only appears in payload transferred events + }; + + this._listeners.forEach((it) => it(payload)); + this._resetAfterEmission(); + }; + private _processPayloadTransferred = (data: PayloadTransferred) => { // if the following properties haven't been provided by now, we should reset if ( - !this._tempId || // server intent hasn't been recieved yet. + !this._tempId || // server intent hasn't been received yet. !data.state || - !data.version || - this._tempBasis === undefined + !data.version ) { - this._resetState(); // a reset is best defensive action since payload transferred terminates a payload + this._resetAll(); // a reset is best defensive action since payload transferred terminates a payload return; } @@ -194,26 +237,35 @@ export class PayloadReader { }; this._listeners.forEach((it) => it(payload)); - this._resetState(); + this._resetAfterEmission(); }; private _processGoodbye = (data: any) => { this._logger?.info( `Goodbye was received from the LaunchDarkly connection with reason: ${data.reason}.`, ); - this._resetState(); + this._resetAll(); }; private _processError = (data: any) => { this._logger?.info( - `An issue was encountered receiving updates for payload ${this._tempId} with reason: ${data.reason}. Automatic retry will occur.`, + `An issue was encountered receiving updates for payload ${this._tempId} with reason: ${data.reason}.`, ); - this._resetState(); + this._resetAfterError(); }; - private _resetState() { + private _resetAfterEmission() { + this._tempBasis = false; + this._tempUpdates = []; + } + + private _resetAfterError() { + this._tempUpdates = []; + } + + private _resetAll() { this._tempId = undefined; - this._tempBasis = undefined; + this._tempBasis = false; this._tempUpdates = []; } } diff --git a/packages/shared/common/src/internal/fdv2/payloadStreamReader.ts b/packages/shared/common/src/internal/fdv2/payloadStreamReader.ts new file mode 100644 index 0000000000..6548a6ce60 --- /dev/null +++ b/packages/shared/common/src/internal/fdv2/payloadStreamReader.ts @@ -0,0 +1,71 @@ +/* eslint-disable no-underscore-dangle */ +import { EventListener, EventName, LDLogger } from '../../api'; +import { DataSourceErrorKind } from '../../datasource'; +import { ObjProcessors, PayloadListener, PayloadProcessor } from './payloadProcessor'; + +/** + * Interface for an event stream. Only allows listening to events. + */ +export interface EventStream { + addEventListener(type: EventName, listener: EventListener): void; +} + +/** + * A FDv2 PayloadStreamReader can be used to parse payloads from a stream of FDv2 events. See {@link PayloadProcessor} + * for more details. + */ +export class PayloadStreamReader { + private _payloadProcessor: PayloadProcessor; + + /** + * Creates a PayloadStreamReader + * + * @param eventStream event stream of FDv2 events + * @param _objProcessors defines object processors for each object kind. + * @param _errorHandler that will be called with parsing errors as they are encountered + * @param _logger for logging + */ + constructor( + eventStream: EventStream, + _objProcessors: ObjProcessors, + private readonly _errorHandler?: (errorKind: DataSourceErrorKind, message: string) => void, + private readonly _logger?: LDLogger, + ) { + this._attachHandler(eventStream, 'server-intent'); + this._attachHandler(eventStream, 'put-object'); + this._attachHandler(eventStream, 'delete-object'); + this._attachHandler(eventStream, 'payload-transferred'); + this._attachHandler(eventStream, 'goodbye'); + this._attachHandler(eventStream, 'error'); + this._payloadProcessor = new PayloadProcessor(_objProcessors, _errorHandler, _logger); + } + + addPayloadListener(listener: PayloadListener) { + this._payloadProcessor.addPayloadListener(listener); + } + + removePayloadListener(listener: PayloadListener) { + this._payloadProcessor.removePayloadListener(listener); + } + + private _attachHandler(stream: EventStream, eventName: string) { + stream.addEventListener(eventName, async (event?: { data?: string }) => { + if (event?.data) { + this._logger?.debug(`Received ${eventName} event. Data is ${event.data}`); + try { + this._payloadProcessor.processEvents([ + { event: eventName, data: JSON.parse(event.data) }, + ]); + } catch { + this._logger?.error( + `Stream received data that was unable to be processed in "${eventName}" message`, + ); + this._logger?.debug(`Data follows: ${event.data}`); + this._errorHandler?.(DataSourceErrorKind.InvalidData, 'Malformed data in EventStream.'); + } + } else { + this._errorHandler?.(DataSourceErrorKind.Unknown, 'Event from EventStream missing data.'); + } + }); + } +} diff --git a/packages/shared/sdk-server/__tests__/data_sources/OneShotInitializerFDv2.test.ts b/packages/shared/sdk-server/__tests__/data_sources/OneShotInitializerFDv2.test.ts new file mode 100644 index 0000000000..a590d6f03a --- /dev/null +++ b/packages/shared/sdk-server/__tests__/data_sources/OneShotInitializerFDv2.test.ts @@ -0,0 +1,81 @@ +import { DataSourceErrorKind, LDPollingError, subsystem } from '../../src'; +import OneShotInitializerFDv2 from '../../src/data_sources/OneShotInitializerFDv2'; +import PollingProcessorFDv2 from '../../src/data_sources/PollingProcessorFDv2'; +import Requestor from '../../src/data_sources/Requestor'; +import TestLogger, { LogLevel } from '../Logger'; + +describe('given a one shot initializer', () => { + const requestor = { + requestAllData: jest.fn(), + }; + const allEvents = { + events: [ + { + event: 'server-intent', + data: { payloads: [{ code: 'xfer-full', id: 'mockId' }] }, + }, + { + event: 'put-object', + data: { + kind: 'flag', + key: 'flagA', + version: 123, + object: { objectFieldA: 'objectValueA' }, + }, + }, + { + event: 'payload-transferred', + data: { state: 'mockState', version: 1 }, + }, + ], + }; + const jsonData = JSON.stringify(allEvents); + + let initializer: OneShotInitializerFDv2; + const mockDataCallback = jest.fn(); + const mockStatusCallback = jest.fn(); + let testLogger: TestLogger; + + beforeEach(() => { + testLogger = new TestLogger(); + initializer = new OneShotInitializerFDv2( + requestor as unknown as Requestor, + testLogger, + ); + }); + + afterEach(() => { + initializer.stop(); + jest.restoreAllMocks(); + }); + + it('makes no requests before being started', () => { + expect(requestor.requestAllData).not.toHaveBeenCalled(); + }); + + it('polls immediately on start', () => { + initializer.start(mockDataCallback, mockStatusCallback); + expect(requestor.requestAllData).toHaveBeenCalledTimes(1); + expect(mockDataCallback).not.toHaveBeenCalled(); + expect(mockStatusCallback).toHaveBeenNthCalledWith(1, subsystem.DataSourceState.Initializing); + }); + + it('calls callback on success', () => { + requestor.requestAllData = jest.fn((cb) => cb(undefined, jsonData)); + initializer.start(mockDataCallback, mockStatusCallback); + expect(mockDataCallback).toHaveBeenNthCalledWith(1, true, { + basis: true, + id: `mockId`, + state: `mockState`, + updates: [ + { + kind: `flag`, + key: `flagA`, + version: 123, + object: { objectFieldA: 'objectValueA' }, + }, + ], + version: 1, + }); + }); +}); diff --git a/packages/shared/sdk-server/__tests__/data_sources/PollingProcessor.test.ts b/packages/shared/sdk-server/__tests__/data_sources/PollingProcessor.test.ts index 05ae9ff282..9a38a55d4c 100644 --- a/packages/shared/sdk-server/__tests__/data_sources/PollingProcessor.test.ts +++ b/packages/shared/sdk-server/__tests__/data_sources/PollingProcessor.test.ts @@ -1,13 +1,9 @@ -import { ClientContext } from '@launchdarkly/js-sdk-common'; - import { LDFeatureStore } from '../../src'; import PollingProcessor from '../../src/data_sources/PollingProcessor'; import Requestor from '../../src/data_sources/Requestor'; -import Configuration from '../../src/options/Configuration'; import AsyncStoreFacade from '../../src/store/AsyncStoreFacade'; import InMemoryFeatureStore from '../../src/store/InMemoryFeatureStore'; import VersionedDataKinds from '../../src/store/VersionedDataKinds'; -import { createBasicPlatform } from '../createBasicPlatform'; import TestLogger, { LogLevel } from '../Logger'; describe('given an event processor', () => { @@ -23,24 +19,19 @@ describe('given an event processor', () => { let store: LDFeatureStore; let storeFacade: AsyncStoreFacade; - let config: Configuration; let processor: PollingProcessor; let initSuccessHandler: jest.Mock; beforeEach(() => { store = new InMemoryFeatureStore(); storeFacade = new AsyncStoreFacade(store); - config = new Configuration({ - featureStore: store, - pollInterval: longInterval, - logger: new TestLogger(), - }); initSuccessHandler = jest.fn(); processor = new PollingProcessor( - config, requestor as unknown as Requestor, - config.featureStoreFactory(new ClientContext('', config, createBasicPlatform())), + longInterval, + store, + new TestLogger(), initSuccessHandler, ); }); @@ -87,27 +78,22 @@ describe('given a polling processor with a short poll duration', () => { const jsonData = JSON.stringify(allData); let store: LDFeatureStore; - let config: Configuration; + let testLogger: TestLogger; let processor: PollingProcessor; let initSuccessHandler: jest.Mock; let errorHandler: jest.Mock; beforeEach(() => { store = new InMemoryFeatureStore(); - config = new Configuration({ - featureStore: store, - pollInterval: shortInterval, - logger: new TestLogger(), - }); + testLogger = new TestLogger(); initSuccessHandler = jest.fn(); errorHandler = jest.fn(); - // Configuration will not let us set this as low as needed for the test. - Object.defineProperty(config, 'pollInterval', { value: 0.1 }); processor = new PollingProcessor( - config, requestor as unknown as Requestor, - config.featureStoreFactory(new ClientContext('', config, createBasicPlatform())), + shortInterval, + store, + testLogger, initSuccessHandler, errorHandler, ); @@ -146,7 +132,6 @@ describe('given a polling processor with a short poll duration', () => { expect(errorHandler).not.toBeCalled(); setTimeout(() => { expect(requestor.requestAllData.mock.calls.length).toBeGreaterThanOrEqual(2); - const testLogger = config.logger as TestLogger; expect(testLogger.getCount(LogLevel.Error)).toBe(0); expect(testLogger.getCount(LogLevel.Warn)).toBeGreaterThan(2); (done as jest.DoneCallback)(); @@ -164,7 +149,6 @@ describe('given a polling processor with a short poll duration', () => { setTimeout(() => { expect(requestor.requestAllData.mock.calls.length).toBeGreaterThanOrEqual(2); - const testLogger = config.logger as TestLogger; expect(testLogger.getCount(LogLevel.Error)).toBeGreaterThan(2); (done as jest.DoneCallback)(); }, 300); @@ -187,7 +171,6 @@ describe('given a polling processor with a short poll duration', () => { setTimeout(() => { expect(requestor.requestAllData.mock.calls.length).toBe(1); - const testLogger = config.logger as TestLogger; expect(testLogger.getCount(LogLevel.Error)).toBe(1); (done as jest.DoneCallback)(); }, 300); diff --git a/packages/shared/sdk-server/__tests__/data_sources/PollingProcessorFDv2.test.ts b/packages/shared/sdk-server/__tests__/data_sources/PollingProcessorFDv2.test.ts new file mode 100644 index 0000000000..e0d77e5fd2 --- /dev/null +++ b/packages/shared/sdk-server/__tests__/data_sources/PollingProcessorFDv2.test.ts @@ -0,0 +1,227 @@ +import { DataSourceErrorKind, LDPollingError, subsystem } from '../../src'; +import PollingProcessorFDv2 from '../../src/data_sources/PollingProcessorFDv2'; +import Requestor from '../../src/data_sources/Requestor'; +import TestLogger, { LogLevel } from '../Logger'; + +describe('given an event processor', () => { + const requestor = { + requestAllData: jest.fn(), + }; + const longInterval = 100000; + const allEvents = { + events: [ + { + event: 'server-intent', + data: { payloads: [{ code: 'xfer-full', id: 'mockId' }] }, + }, + { + event: 'put-object', + data: { + kind: 'flag', + key: 'flagA', + version: 123, + object: { objectFieldA: 'objectValueA' }, + }, + }, + { + event: 'payload-transferred', + data: { state: 'mockState', version: 1 }, + }, + ], + }; + const jsonData = JSON.stringify(allEvents); + + let processor: PollingProcessorFDv2; + const mockDataCallback = jest.fn(); + const mockStatusCallback = jest.fn(); + + beforeEach(() => { + processor = new PollingProcessorFDv2( + requestor as unknown as Requestor, + longInterval, + new TestLogger(), + ); + }); + + afterEach(() => { + processor.stop(); + jest.restoreAllMocks(); + }); + + it('makes no requests before being started', () => { + expect(requestor.requestAllData).not.toHaveBeenCalled(); + }); + + it('polls immediately on start', () => { + processor.start(mockDataCallback, mockStatusCallback); + expect(requestor.requestAllData).toHaveBeenCalledTimes(1); + expect(mockDataCallback).not.toHaveBeenCalled(); + expect(mockStatusCallback).toHaveBeenNthCalledWith(1, subsystem.DataSourceState.Initializing); + }); + + it('calls callback on success', () => { + requestor.requestAllData = jest.fn((cb) => cb(undefined, jsonData)); + processor.start(mockDataCallback, mockStatusCallback); + expect(mockDataCallback).toHaveBeenNthCalledWith(1, true, { + basis: true, + id: `mockId`, + state: `mockState`, + updates: [ + { + kind: `flag`, + key: `flagA`, + version: 123, + object: { objectFieldA: 'objectValueA' }, + }, + ], + version: 1, + }); + }); +}); + +describe('given a polling processor with a short poll duration', () => { + const requestor = { + requestAllData: jest.fn(), + }; + const shortInterval = 0.1; + const allEvents = { + events: [ + { + event: 'server-intent', + data: { payloads: [{ code: 'xfer-full', id: 'mockId' }] }, + }, + { + event: 'put-object', + data: { + kind: 'flag', + key: 'flagA', + version: 123, + object: { objectFieldA: 'objectValueA' }, + }, + }, + { + event: 'payload-transferred', + data: { state: 'mockState', version: 1 }, + }, + ], + }; + const jsonData = JSON.stringify(allEvents); + + let testLogger: TestLogger; + let processor: PollingProcessorFDv2; + const mockDataCallback = jest.fn(); + const mockStatusCallback = jest.fn(); + + beforeEach(() => { + testLogger = new TestLogger(); + + processor = new PollingProcessorFDv2( + requestor as unknown as Requestor, + shortInterval, + testLogger, + ); + }); + + afterEach(() => { + processor.stop(); + jest.resetAllMocks(); + }); + + it('polls repeatedly', (done) => { + requestor.requestAllData = jest.fn((cb) => cb(undefined, jsonData)); + + processor.start(mockDataCallback, mockStatusCallback); + setTimeout(() => { + expect(requestor.requestAllData.mock.calls.length).toBeGreaterThanOrEqual(4); + done(); + }, 500); + }); + + it.each([400, 408, 429, 500, 503])( + 'continues polling after recoverable error', + (status, done) => { + requestor.requestAllData = jest.fn((cb) => + cb( + { + status, + }, + undefined, + ), + ); + + processor.start(mockDataCallback, mockStatusCallback); + expect(mockDataCallback).not.toHaveBeenCalled(); + expect(mockStatusCallback).toHaveBeenNthCalledWith(1, subsystem.DataSourceState.Initializing); + expect(mockStatusCallback).toHaveBeenNthCalledWith( + 2, + subsystem.DataSourceState.Interrupted, + new LDPollingError( + DataSourceErrorKind.ErrorResponse, + `Received error ${status} for polling request - will retry`, + status as number, + ), + ); + setTimeout(() => { + expect(requestor.requestAllData.mock.calls.length).toBeGreaterThanOrEqual(2); + expect(testLogger.getCount(LogLevel.Error)).toBe(0); + expect(testLogger.getCount(LogLevel.Warn)).toBeGreaterThan(2); + (done as jest.DoneCallback)(); + }, 300); + }, + ); + + it('continues polling after receiving invalid JSON', (done) => { + requestor.requestAllData = jest.fn((cb) => cb(undefined, '{sad')); + + processor.start(mockDataCallback, mockStatusCallback); + expect(mockDataCallback).not.toHaveBeenCalled(); + expect(mockStatusCallback).toHaveBeenNthCalledWith(1, subsystem.DataSourceState.Initializing); + expect(mockStatusCallback).toHaveBeenNthCalledWith( + 2, + subsystem.DataSourceState.Interrupted, + new LDPollingError( + DataSourceErrorKind.ErrorResponse, + `Malformed JSON data in polling response`, + ), + ); + + setTimeout(() => { + expect(requestor.requestAllData.mock.calls.length).toBeGreaterThanOrEqual(2); + expect(testLogger.getCount(LogLevel.Error)).toBeGreaterThan(2); + (done as jest.DoneCallback)(); + }, 300); + }); + + it.each([401, 403])( + 'does not continue after non-recoverable error', + (status, done) => { + requestor.requestAllData = jest.fn((cb) => + cb( + { + status, + }, + undefined, + ), + ); + processor.start(mockDataCallback, mockStatusCallback); + expect(mockDataCallback).not.toHaveBeenCalled(); + expect(mockStatusCallback).toHaveBeenNthCalledWith(1, subsystem.DataSourceState.Initializing); + expect(mockStatusCallback).toHaveBeenNthCalledWith( + 2, + subsystem.DataSourceState.Interrupted, + new LDPollingError( + DataSourceErrorKind.ErrorResponse, + status === 401 + ? `Received error ${status} (invalid SDK key) for polling request - giving up permanently` + : `Received error ${status} for polling request - giving up permanently`, + status as number, + ), + ); + setTimeout(() => { + expect(requestor.requestAllData.mock.calls.length).toBe(1); + expect(testLogger.getCount(LogLevel.Error)).toBe(1); + (done as jest.DoneCallback)(); + }, 300); + }, + ); +}); diff --git a/packages/shared/sdk-server/__tests__/data_sources/StreamingProcessorFDv2.test.ts b/packages/shared/sdk-server/__tests__/data_sources/StreamingProcessorFDv2.test.ts index c30a91c446..4f864bc51b 100644 --- a/packages/shared/sdk-server/__tests__/data_sources/StreamingProcessorFDv2.test.ts +++ b/packages/shared/sdk-server/__tests__/data_sources/StreamingProcessorFDv2.test.ts @@ -37,7 +37,7 @@ const events = { data: '{"payloads": [{"code": "xfer-full", "id": "mockId"}]}', }, 'put-object': { - data: '{"kind": "mockKind", "key": "flagA", "version": 123, "object": {"objectFieldA": "objectValueA"}}', + data: '{"kind": "flag", "key": "flagA", "version": 123, "object": {"objectFieldA": "objectValueA"}}', }, 'payload-transferred': { data: '{"state": "mockState", "version": 1}', @@ -66,11 +66,11 @@ const createMockEventSource = (streamUri: string = '', options: any = {}) => ({ describe('given a stream processor with mock event source', () => { let info: Info; - let streamingProcessor: subsystem.LDStreamProcessor; + let streamingProcessor: StreamingProcessorFDv2; let diagnosticsManager: internal.DiagnosticsManager; - let listener: internal.PayloadListener; let mockEventSource: any; - let mockErrorHandler: jest.Mock; + let mockDataCallback: jest.Mock; + let mockStatusCallback: jest.Mock; let simulateEvents: (e?: any) => void; let simulateError: (e: { status: number; message: string }) => boolean; @@ -84,7 +84,6 @@ describe('given a stream processor with mock event source', () => { }); beforeEach(() => { - mockErrorHandler = jest.fn(); info = basicPlatform.info; @@ -95,6 +94,7 @@ describe('given a stream processor with mock event source', () => { }), } as any; simulateEvents = (e: any = events) => { + // positions in these call arrays match order of handler attachment in payloadStreamReader mockEventSource.addEventListener.mock.calls[0][1](e['server-intent']); // server intent listener mockEventSource.addEventListener.mock.calls[1][1](e['put-object']); // put listener mockEventSource.addEventListener.mock.calls[3][1](e['payload-transferred']); // payload transferred listener @@ -102,8 +102,6 @@ describe('given a stream processor with mock event source', () => { simulateError = (e: { status: number; message: string }): boolean => mockEventSource.options.errorFilter(e); - listener = jest.fn(); - diagnosticsManager = new internal.DiagnosticsManager(sdkKey, basicPlatform, {}); streamingProcessor = new StreamingProcessorFDv2( { @@ -112,18 +110,19 @@ describe('given a stream processor with mock event source', () => { }, '/all', [], - listener, { authorization: 'my-sdk-key', 'user-agent': 'TestUserAgent/2.0.2', 'x-launchdarkly-wrapper': 'Rapper/1.2.3', }, diagnosticsManager, - mockErrorHandler, ); jest.spyOn(streamingProcessor, 'stop'); - streamingProcessor.start(); + + mockDataCallback = jest.fn(); + mockStatusCallback = jest.fn(); + streamingProcessor.start(mockDataCallback, mockStatusCallback); }); afterEach(() => { @@ -152,17 +151,15 @@ describe('given a stream processor with mock event source', () => { }, '/all', [], - listener, { authorization: 'my-sdk-key', 'user-agent': 'TestUserAgent/2.0.2', 'x-launchdarkly-wrapper': 'Rapper/1.2.3', }, diagnosticsManager, - mockErrorHandler, 22, ); - streamingProcessor.start(); + streamingProcessor.start(jest.fn(), jest.fn()); expect(basicPlatform.requests.createEventSource).toHaveBeenLastCalledWith( `${serviceEndpoints.streaming}/all`, @@ -211,7 +208,20 @@ describe('given a stream processor with mock event source', () => { it('executes payload listener', () => { simulateEvents(); - expect(listener).toHaveBeenCalled(); + expect(mockDataCallback).toHaveBeenNthCalledWith(1, true, { + basis: true, + id: `mockId`, + state: `mockState`, + updates: [ + { + kind: `flag`, + key: `flagA`, + version: 123, + object: { objectFieldA: 'objectValueA' }, + }, + ], + version: 1, + }); }); it('passes error to callback if json data is malformed', async () => { @@ -221,8 +231,11 @@ describe('given a stream processor with mock event source', () => { }, }); - expect(mockErrorHandler.mock.calls[0][0].kind).toEqual(DataSourceErrorKind.InvalidData); - expect(mockErrorHandler.mock.calls[0][0].message).toEqual('Malformed data in event stream'); + expect(mockStatusCallback).toHaveBeenNthCalledWith( + 2, + subsystem.DataSourceState.Interrupted, + new LDStreamingError(DataSourceErrorKind.InvalidData, 'Malformed data in EventStream.'), + ); }); it('calls error handler if event.data prop is missing', async () => { @@ -238,9 +251,12 @@ describe('given a stream processor with mock event source', () => { notData: '{"state": "mockState", "version": 1}', }, }); - expect(listener).not.toHaveBeenCalled(); - expect(mockErrorHandler.mock.calls[0][0].kind).toEqual(DataSourceErrorKind.Unknown); - expect(mockErrorHandler.mock.calls[0][0].message).toMatch(/unexpected message/i); + expect(mockDataCallback).not.toHaveBeenCalled(); + expect(mockStatusCallback).toHaveBeenNthCalledWith( + 2, + subsystem.DataSourceState.Interrupted, + new LDStreamingError(DataSourceErrorKind.Unknown, 'Event from EventStream missing data.'), + ); }); it('closes and stops', async () => { @@ -271,7 +287,8 @@ describe('given a stream processor with mock event source', () => { const willRetry = simulateError(testError); expect(willRetry).toBeTruthy(); - expect(mockErrorHandler).not.toBeCalled(); + expect(mockStatusCallback).toHaveBeenNthCalledWith(1, subsystem.DataSourceState.Initializing); + expect(mockStatusCallback).toHaveBeenNthCalledWith(2, subsystem.DataSourceState.Interrupted); expect(logger.warn).toBeCalledWith( expect.stringMatching(new RegExp(`${status}.*will retry`)), ); @@ -292,7 +309,9 @@ describe('given a stream processor with mock event source', () => { const willRetry = simulateError(testError); expect(willRetry).toBeFalsy(); - expect(mockErrorHandler).toBeCalledWith( + expect(mockStatusCallback).toHaveBeenNthCalledWith( + 2, + subsystem.DataSourceState.Closed, new LDStreamingError(DataSourceErrorKind.Unknown, testError.message, testError.status), ); expect(logger.error).toBeCalledWith( diff --git a/packages/shared/sdk-server/src/LDClientImpl.ts b/packages/shared/sdk-server/src/LDClientImpl.ts index f149306e48..e8afbf53f4 100644 --- a/packages/shared/sdk-server/src/LDClientImpl.ts +++ b/packages/shared/sdk-server/src/LDClientImpl.ts @@ -2,6 +2,7 @@ import { cancelableTimedPromise, ClientContext, + CompositeDataSource, Context, defaultHeaders, internal, @@ -37,11 +38,12 @@ import { } from './api/options/LDDataSystemOptions'; import BigSegmentsManager from './BigSegmentsManager'; import BigSegmentStoreStatusProvider from './BigSegmentStatusProviderImpl'; -import { createStreamListeners } from './data_sources/createStreamListeners'; +import { createPayloadListener } from './data_sources/createPayloadListenerFDv2'; import DataSourceUpdates from './data_sources/DataSourceUpdates'; -import PollingProcessor from './data_sources/PollingProcessor'; +import OneShotInitializerFDv2 from './data_sources/OneShotInitializerFDv2'; +import PollingProcessorFDv2 from './data_sources/PollingProcessorFDv2'; import Requestor from './data_sources/Requestor'; -import StreamingProcessor from './data_sources/StreamingProcessor'; +import StreamingProcessorFDv2 from './data_sources/StreamingProcessorFDv2'; import createDiagnosticsInitConfig from './diagnostics/createDiagnosticsInitConfig'; import { allAsync } from './evaluation/collection'; import { Flag } from './evaluation/data/Flag'; @@ -102,6 +104,8 @@ export default class LDClientImpl implements LDClient { private _updateProcessor?: subsystem.LDStreamProcessor; + private _dataSource?: subsystem.DataSource; + private _eventFactoryDefault = new EventFactory(false); private _eventFactoryWithReasons = new EventFactory(true); @@ -221,50 +225,82 @@ export default class LDClientImpl implements LDClient { }; this._evaluator = new Evaluator(this._platform, queries); - const listeners = createStreamListeners(dataSourceUpdates, this._logger, { - put: () => this._initSuccess(), - }); - const makeDefaultProcessor = () => { - if (isPollingOnlyOptions(config.dataSystem.dataSource)) { - return new PollingProcessor( - new Requestor(config, this._platform.requests, baseHeaders), - config.dataSystem.dataSource.pollInterval ?? 30, - dataSourceUpdates, - config.logger, - () => this._initSuccess(), - (e) => this._dataSourceErrorHandler(e), - ); - } - // TODO: SDK-858 Hook up composite data source and config - const reconnectDelay = - isStandardOptions(config.dataSystem.dataSource) || - isStreamingOnlyOptions(config.dataSystem.dataSource) - ? config.dataSystem.dataSource.streamInitialReconnectDelay - : 1; - return new StreamingProcessor( + if (!(config.offline || config.dataSystem.useLdd)) { + // use configured update processor factory if one exists + const updateProcessor = config.dataSystem.updateProcessorFactory?.( clientContext, - '/all', - [], - listeners, - baseHeaders, - this._diagnosticsManager, + dataSourceUpdates, + () => this._initSuccess(), (e) => this._dataSourceErrorHandler(e), - reconnectDelay, ); - }; + if (updateProcessor) { + this._updateProcessor = updateProcessor; + this._updateProcessor?.start(); + } else { + // make the FDv2 composite datasource with initializers/synchronizers + const initializers: subsystem.LDSynchronizerFactory[] = []; + + // use one shot initializer for performance and cost + initializers.push( + () => + new OneShotInitializerFDv2( + new Requestor(config, this._platform.requests, baseHeaders), + config.logger, + ), + ); - if (!(config.offline || config.dataSystem.useLdd)) { - this._updateProcessor = - config.dataSystem.updateProcessorFactory?.( - clientContext, - dataSourceUpdates, - () => this._initSuccess(), - (e) => this._dataSourceErrorHandler(e), - ) ?? makeDefaultProcessor(); - } + const synchronizers: subsystem.LDSynchronizerFactory[] = []; + // if streaming is configured, add streaming synchronizer + if ( + isStandardOptions(config.dataSystem.dataSource) || + isStreamingOnlyOptions(config.dataSystem.dataSource) + ) { + const reconnectDelay = config.dataSystem.dataSource.streamInitialReconnectDelay; + synchronizers.push( + () => + new StreamingProcessorFDv2( + clientContext, + '/all', + [], + baseHeaders, + this._diagnosticsManager, + reconnectDelay, + ), + ); + } + + // if polling is configured, add polling synchronizer + if ( + isStandardOptions(config.dataSystem.dataSource) || + isPollingOnlyOptions(config.dataSystem.dataSource) + ) { + const pollingInterval = config.dataSystem.dataSource.pollInterval; + synchronizers.push( + () => + new PollingProcessorFDv2( + new Requestor(config, this._platform.requests, baseHeaders), + pollingInterval, + config.logger, + ), + ); + } + + this._dataSource = new CompositeDataSource(initializers, synchronizers, this.logger); + const payloadListener = createPayloadListener(dataSourceUpdates, this.logger, () => { + this._initSuccess(); + }); - if (this._updateProcessor) { - this._updateProcessor.start(); + this._dataSource.start( + (_, payload) => { + payloadListener(payload); + }, + (_, err) => { + if (err) { + this._dataSourceErrorHandler(err); + } + }, + ); + } } else { // Deferring the start callback should allow client construction to complete before we start // emitting events. Allowing the client an opportunity to register events. @@ -285,7 +321,10 @@ export default class LDClientImpl implements LDClient { // If there is no update processor, then there is functionally no initialization // so it is fine not to wait. - if (options?.timeout === undefined && this._updateProcessor !== undefined) { + if ( + options?.timeout === undefined && + (this._updateProcessor !== undefined || this._dataSource !== undefined) + ) { this._logger?.warn( 'The waitForInitialization function was called without a timeout specified.' + ' In a future version a default timeout will be applied.', @@ -294,7 +333,7 @@ export default class LDClientImpl implements LDClient { if ( options?.timeout !== undefined && options?.timeout > HIGH_TIMEOUT_THRESHOLD && - this._updateProcessor !== undefined + (this._updateProcessor !== undefined || this._dataSource !== undefined) ) { this._logger?.warn( 'The waitForInitialization function was called with a timeout greater than ' + @@ -740,6 +779,7 @@ export default class LDClientImpl implements LDClient { close(): void { this._eventProcessor.close(); this._updateProcessor?.close(); + this._dataSource?.stop(); this._featureStore.close(); this._bigSegmentsManager.close(); } diff --git a/packages/shared/sdk-server/src/data_sources/OneShotInitializerFDv2.ts b/packages/shared/sdk-server/src/data_sources/OneShotInitializerFDv2.ts new file mode 100644 index 0000000000..8049511e8d --- /dev/null +++ b/packages/shared/sdk-server/src/data_sources/OneShotInitializerFDv2.ts @@ -0,0 +1,103 @@ +import { + DataSourceErrorKind, + httpErrorMessage, + internal, + LDLogger, + LDPollingError, + subsystem as subsystemCommon, +} from '@launchdarkly/js-sdk-common'; + +import { Flag } from '../evaluation/data/Flag'; +import { Segment } from '../evaluation/data/Segment'; +import { processFlag, processSegment } from '../store/serialization'; +import Requestor from './Requestor'; + +/** + * @internal + */ +export default class OneShotInitializerFDv2 implements subsystemCommon.DataSystemInitializer { + constructor( + private readonly _requestor: Requestor, + private readonly _logger?: LDLogger, + ) {} + + start( + dataCallback: (basis: boolean, data: any) => void, + statusCallback: (status: subsystemCommon.DataSourceState, err?: any) => void, + ) { + statusCallback(subsystemCommon.DataSourceState.Initializing); + + this._logger?.debug('Performing initialization request to LaunchDarkly for feature flag data.'); + this._requestor.requestAllData((err, body) => { + if (err) { + const { status } = err; + const message = httpErrorMessage(err, 'initializer', 'initializer does not retry'); + this._logger?.error(message); + statusCallback( + subsystemCommon.DataSourceState.Closed, + new LDPollingError(DataSourceErrorKind.ErrorResponse, message, status), + ); + return; + } + + if (!body) { + this._logger?.error('One shot initializer response missing body.'); + statusCallback( + subsystemCommon.DataSourceState.Closed, + new LDPollingError( + DataSourceErrorKind.InvalidData, + 'One shot initializer response missing body.', + ), + ); + return; + } + + try { + const parsed = JSON.parse(body) as internal.FDv2EventsCollection; + const payloadProcessor = new internal.PayloadProcessor( + { + flag: (flag: Flag) => { + processFlag(flag); + return flag; + }, + segment: (segment: Segment) => { + processSegment(segment); + return segment; + }, + }, + (errorKind: DataSourceErrorKind, message: string) => { + statusCallback( + subsystemCommon.DataSourceState.Interrupted, + new LDPollingError(errorKind, message), + ); + }, + this._logger, + ); + + payloadProcessor.addPayloadListener((payload) => { + dataCallback(payload.basis, payload); + }); + + payloadProcessor.processEvents(parsed.events); + + // TODO: SDK-855 implement blocking duplicate data source state events in DataAvailability API + statusCallback(subsystemCommon.DataSourceState.Valid); + } catch { + // We could not parse this JSON. Report the problem. + this._logger?.error('Initialization response contained invalid data'); + this._logger?.debug(`Malformed JSON follows: ${body}`); + statusCallback( + subsystemCommon.DataSourceState.Closed, + new LDPollingError( + DataSourceErrorKind.InvalidData, + 'Malformed JSON data in polling response', + ), + ); + } + }); + } + + stop() { + // no-op since requestor has no cancellation support + } +} diff --git a/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts b/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts new file mode 100644 index 0000000000..2f155cc288 --- /dev/null +++ b/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts @@ -0,0 +1,152 @@ +import { + DataSourceErrorKind, + httpErrorMessage, + internal, + isHttpRecoverable, + LDLogger, + LDPollingError, + subsystem as subsystemCommon, +} from '@launchdarkly/js-sdk-common'; + +import { Flag } from '../evaluation/data/Flag'; +import { Segment } from '../evaluation/data/Segment'; +import { processFlag, processSegment } from '../store/serialization'; +import Requestor from './Requestor'; + +export type PollingErrorHandler = (err: LDPollingError) => void; + +/** + * @internal + */ +export default class PollingProcessorFDv2 implements subsystemCommon.DataSystemSynchronizer { + private _stopped = false; + private _timeoutHandle: any; + + private _statusCallback?: (status: subsystemCommon.DataSourceState, err?: any) => void; + + constructor( + private readonly _requestor: Requestor, + private readonly _pollInterval: number = 30, + private readonly _logger?: LDLogger, + ) {} + + private _poll( + dataCallback: (basis: boolean, data: any) => void, + statusCallback: (status: subsystemCommon.DataSourceState, err?: any) => void, + ) { + if (this._stopped) { + return; + } + + const startTime = Date.now(); + this._logger?.debug('Polling LaunchDarkly for feature flag updates'); + this._requestor.requestAllData((err, body) => { + const elapsed = Date.now() - startTime; + const sleepFor = Math.max(this._pollInterval * 1000 - elapsed, 0); + + this._logger?.debug('Elapsed: %d ms, sleeping for %d ms', elapsed, sleepFor); + if (err) { + const { status } = err; + if (status && !isHttpRecoverable(status)) { + const message = httpErrorMessage(err, 'polling request'); + this._logger?.error(message); + statusCallback( + subsystemCommon.DataSourceState.Interrupted, + new LDPollingError(DataSourceErrorKind.ErrorResponse, message, status), + ); + // It is not recoverable, return and do not trigger another poll. + return; + } + + const message = httpErrorMessage(err, 'polling request', 'will retry'); + statusCallback( + subsystemCommon.DataSourceState.Interrupted, + new LDPollingError(DataSourceErrorKind.ErrorResponse, message, status), + ); + this._logger?.warn(message); + // schedule poll + this._timeoutHandle = setTimeout(() => { + this._poll(dataCallback, statusCallback); + }, sleepFor); + return; + } + + if (!body) { + this._logger?.warn('Response missing body, will retry.'); + // schedule poll + this._timeoutHandle = setTimeout(() => { + this._poll(dataCallback, statusCallback); + }, sleepFor); + return; + } + + try { + const parsed = JSON.parse(body) as internal.FDv2EventsCollection; + const payloadProcessor = new internal.PayloadProcessor( + { + flag: (flag: Flag) => { + processFlag(flag); + return flag; + }, + segment: (segment: Segment) => { + processSegment(segment); + return segment; + }, + }, + (errorKind: DataSourceErrorKind, message: string) => { + statusCallback( + subsystemCommon.DataSourceState.Interrupted, + new LDPollingError(errorKind, message), + ); + }, + this._logger, + ); + + payloadProcessor.addPayloadListener((payload) => { + dataCallback(payload.basis, payload); + }); + + payloadProcessor.processEvents(parsed.events); + + // TODO: SDK-855 implement blocking duplicate data source state events in DataAvailability API + statusCallback(subsystemCommon.DataSourceState.Valid); + } catch { + // We could not parse this JSON. Report the problem and fallthrough to + // start another poll. + this._logger?.error('Polling received malformed data'); + this._logger?.debug(`Malformed JSON follows: ${body}`); + statusCallback( + subsystemCommon.DataSourceState.Interrupted, + new LDPollingError( + DataSourceErrorKind.InvalidData, + 'Malformed JSON data in polling response', + ), + ); + } + + // schedule poll + this._timeoutHandle = setTimeout(() => { + this._poll(dataCallback, statusCallback); + }, sleepFor); + }); + } + + start( + dataCallback: (basis: boolean, data: any) => void, + statusCallback: (status: subsystemCommon.DataSourceState, err?: any) => void, + ) { + this._statusCallback = statusCallback; // hold reference for usage in stop() + statusCallback(subsystemCommon.DataSourceState.Initializing); + this._poll(dataCallback, statusCallback); + } + + stop() { + if (this._timeoutHandle) { + clearTimeout(this._timeoutHandle); + this._timeoutHandle = undefined; + } + this._statusCallback?.(subsystemCommon.DataSourceState.Closed); + this._stopped = true; + this._statusCallback = undefined; + } +} diff --git a/packages/shared/sdk-server/src/data_sources/StreamingProcessorFDv2.ts b/packages/shared/sdk-server/src/data_sources/StreamingProcessorFDv2.ts index 901c3d3ec7..7378f79562 100644 --- a/packages/shared/sdk-server/src/data_sources/StreamingProcessorFDv2.ts +++ b/packages/shared/sdk-server/src/data_sources/StreamingProcessorFDv2.ts @@ -11,17 +11,14 @@ import { LDStreamingError, Requests, shouldRetry, - StreamingErrorHandler, - subsystem, + subsystem as subsystemCommon, } from '@launchdarkly/js-sdk-common'; -import { PayloadListener } from '@launchdarkly/js-sdk-common/dist/esm/internal'; import { Flag } from '../evaluation/data/Flag'; import { Segment } from '../evaluation/data/Segment'; import { processFlag, processSegment } from '../store/serialization'; -// TODO: consider naming this StreamingDatasource -export default class StreamingProcessorFDv2 implements subsystem.LDStreamProcessor { +export default class StreamingProcessorFDv2 implements subsystemCommon.DataSystemSynchronizer { private readonly _headers: { [key: string]: string | string[] }; private readonly _streamUri: string; private readonly _logger?: LDLogger; @@ -34,10 +31,8 @@ export default class StreamingProcessorFDv2 implements subsystem.LDStreamProcess clientContext: ClientContext, streamUriPath: string, parameters: { key: string; value: string }[], - private readonly _payloadListener: PayloadListener, baseHeaders: LDHeaders, private readonly _diagnosticsManager?: internal.DiagnosticsManager, - private readonly _errorHandler?: StreamingErrorHandler, private readonly _streamInitialReconnectDelay = 1, ) { const { basicConfiguration, platform } = clientContext; @@ -54,7 +49,7 @@ export default class StreamingProcessorFDv2 implements subsystem.LDStreamProcess ); } - private _logConnectionStarted() { + private _logConnectionAttempt() { this._connectionAttemptStartTime = Date.now(); } @@ -79,34 +74,43 @@ export default class StreamingProcessorFDv2 implements subsystem.LDStreamProcess * * @private */ - private _retryAndHandleError(err: HttpErrorResponse) { + private _retryAndHandleError( + err: HttpErrorResponse, + statusCallback: (status: subsystemCommon.DataSourceState, err?: any) => void, + ) { if (!shouldRetry(err)) { + this._logger?.error(httpErrorMessage(err, 'streaming request')); this._logConnectionResult(false); - this._errorHandler?.( + statusCallback( + subsystemCommon.DataSourceState.Closed, new LDStreamingError(DataSourceErrorKind.ErrorResponse, err.message, err.status), ); - this._logger?.error(httpErrorMessage(err, 'streaming request')); return false; } this._logger?.warn(httpErrorMessage(err, 'streaming request', 'will retry')); this._logConnectionResult(false); - this._logConnectionStarted(); + this._logConnectionAttempt(); + statusCallback(subsystemCommon.DataSourceState.Interrupted); return true; } - start() { - this._logConnectionStarted(); + start( + dataCallback: (basis: boolean, data: any) => void, + statusCallback: (status: subsystemCommon.DataSourceState, err?: any) => void, + ) { + this._logConnectionAttempt(); + statusCallback(subsystemCommon.DataSourceState.Initializing); const eventSource = this._requests.createEventSource(this._streamUri, { headers: this._headers, - errorFilter: (error: HttpErrorResponse) => this._retryAndHandleError(error), + errorFilter: (error: HttpErrorResponse) => this._retryAndHandleError(error, statusCallback), initialRetryDelayMillis: 1000 * this._streamInitialReconnectDelay, readTimeoutMillis: 5 * 60 * 1000, retryResetIntervalMillis: 60 * 1000, }); this._eventSource = eventSource; - const payloadReader = new internal.PayloadReader( + const payloadReader = new internal.PayloadStreamReader( eventSource, { flag: (flag: Flag) => { @@ -119,19 +123,24 @@ export default class StreamingProcessorFDv2 implements subsystem.LDStreamProcess }, }, (errorKind: DataSourceErrorKind, message: string) => { - this._errorHandler?.(new LDStreamingError(errorKind, message)); + statusCallback( + subsystemCommon.DataSourceState.Interrupted, + new LDStreamingError(errorKind, message), + ); + + // parsing error was encountered, defensively close the data source + this.stop(); }, this._logger, ); - payloadReader.addPayloadListener(() => { - // TODO: discuss if it is satisfactory to switch from setting connection result on single event to getting a payload. Need - // to double check the handling in the ServerIntent:none case. That may not trigger these payload listeners. + payloadReader.addPayloadListener((payload) => { this._logConnectionResult(true); + dataCallback(payload.basis, payload); }); - payloadReader.addPayloadListener(this._payloadListener); eventSource.onclose = () => { this._logger?.info('Closed LaunchDarkly stream connection'); + statusCallback(subsystemCommon.DataSourceState.Closed); }; eventSource.onerror = () => { @@ -140,6 +149,7 @@ export default class StreamingProcessorFDv2 implements subsystem.LDStreamProcess eventSource.onopen = () => { this._logger?.info('Opened LaunchDarkly stream connection'); + statusCallback(subsystemCommon.DataSourceState.Valid); }; eventSource.onretrying = (e) => { diff --git a/packages/shared/sdk-server/src/diagnostics/createDiagnosticsInitConfig.ts b/packages/shared/sdk-server/src/diagnostics/createDiagnosticsInitConfig.ts index a4892e73a9..91b0a36663 100644 --- a/packages/shared/sdk-server/src/diagnostics/createDiagnosticsInitConfig.ts +++ b/packages/shared/sdk-server/src/diagnostics/createDiagnosticsInitConfig.ts @@ -27,13 +27,17 @@ const createDiagnosticsInitConfig = ( ...((isStandardOptions(config.dataSystem.dataSource) || isPollingOnlyOptions(config.dataSystem.dataSource)) && config.dataSystem.dataSource.pollInterval - ? { pollingIntervalMillis: config.dataSystem.dataSource.pollInterval } + ? { pollingIntervalMillis: secondsToMillis(config.dataSystem.dataSource.pollInterval) } : null), // include reconnect delay if data source config has it ...((isStandardOptions(config.dataSystem.dataSource) || isStreamingOnlyOptions(config.dataSystem.dataSource)) && config.dataSystem.dataSource.streamInitialReconnectDelay - ? { reconnectTimeMillis: config.dataSystem.dataSource.streamInitialReconnectDelay } + ? { + reconnectTimeMillis: secondsToMillis( + config.dataSystem.dataSource.streamInitialReconnectDelay, + ), + } : null), contextKeysFlushIntervalMillis: secondsToMillis(config.contextKeysFlushInterval), diagnosticRecordingIntervalMillis: secondsToMillis(config.diagnosticRecordingInterval),