diff --git a/contract-tests/sdkClientEntity.js b/contract-tests/sdkClientEntity.js index 7301ffacfa..fcb51ce9fe 100644 --- a/contract-tests/sdkClientEntity.js +++ b/contract-tests/sdkClientEntity.js @@ -23,6 +23,7 @@ export function makeSdkConfig(options, tag) { const maybeTime = (seconds) => seconds === undefined || seconds === null ? undefined : seconds / 1000; + if (options.streaming) { cf.streamUri = options.streaming.baseUri; cf.streamInitialReconnectDelay = maybeTime(options.streaming.initialRetryDelayMs); @@ -33,7 +34,7 @@ export function makeSdkConfig(options, tag) { if (options.polling) { cf.stream = false; cf.baseUri = options.polling.baseUri; - cf.pollInterface = options.polling.pollIntervalMs / 1000; + cf.pollInterval = options.polling.pollIntervalMs / 1000; if (options.polling.filter) { cf.payloadFilterKey = options.polling.filter; } @@ -81,6 +82,61 @@ export function makeSdkConfig(options, tag) { cf.wrapperVersion = options.wrapper.version; } } + if (options.dataSystem) { + const dataSourceStreamingOptions = options.dataSystem.synchronizers?.primary?.streaming ?? options.dataSystem.synchronizers?.secondary?.streaming; + const dataSourcePollingOptions = options.dataSystem.synchronizers?.primary?.polling ?? options.dataSystem.synchronizers?.secondary?.polling; + + if (dataSourceStreamingOptions) { + cf.streamUri = dataSourceStreamingOptions.baseUri; + cf.streamInitialReconnectDelay = maybeTime(dataSourceStreamingOptions.initialRetryDelayMs); + if (dataSourceStreamingOptions.filter) { + cf.payloadFilterKey = dataSourceStreamingOptions.filter; + } + } + if (dataSourcePollingOptions) { + cf.stream = false; + cf.baseUri = dataSourcePollingOptions.baseUri; + cf.pollInterval = dataSourcePollingOptions.pollIntervalMs / 1000; + if (dataSourcePollingOptions.filter) { + cf.payloadFilterKey = dataSourcePollingOptions.filter; + } + } + + let dataSourceOptions; + if (dataSourceStreamingOptions && dataSourcePollingOptions) { + dataSourceOptions = { + type: 'standard', + ...(dataSourceStreamingOptions.initialRetryDelayMs != null && + { streamInitialReconnectDelay: maybeTime(dataSourceStreamingOptions.initialRetryDelayMs) }), + ...(dataSourcePollingOptions.pollIntervalMs != null && + { pollInterval: dataSourcePollingOptions.pollIntervalMs }), + } + } else if (dataSourceStreamingOptions) { + dataSourceOptions = { + type: 'streamingOnly', + ...(dataSourceStreamingOptions.initialRetryDelayMs != null && + { streamInitialReconnectDelay: maybeTime(dataSourceStreamingOptions.initialRetryDelayMs) }), + } + } else if (dataSourcePollingOptions) { + dataSourceOptions = { + type: 'pollingOnly', + ...(dataSourcePollingOptions.pollIntervalMs != null && + { pollInterval: dataSourcePollingOptions.pollIntervalMs }), + } + } else { + // No data source options were specified + dataSourceOptions = undefined; + } + + if (options.dataSystem.payloadFilter) { + cf.payloadFilterKey = options.dataSystem.payloadFilter; + } + + cf.dataSystem = { + dataSource: dataSourceOptions, + } + } + return cf; } diff --git a/contract-tests/testharness-suppressions.txt b/contract-tests/testharness-suppressions.txt index 8a29da94d7..e6fdd9ffc9 100644 --- a/contract-tests/testharness-suppressions.txt +++ b/contract-tests/testharness-suppressions.txt @@ -1,2 +1,16 @@ +polling/requests +polling/payload/large payloads + streaming/validation/drop and reconnect if stream event has malformed JSON streaming/validation/drop and reconnect if stream event has well-formed JSON not matching schema + +wrapper/poll requests/wrapper name and version + +streaming/fdv2/reconnection state management/initializes from polling initializer +streaming/fdv2/reconnection state management/initializes from 2 polling initializers + +streaming/fdv2/reconnection state management/saves previously known state +streaming/fdv2/reconnection state management/replaces previously known state +streaming/fdv2/reconnection state management/updates previously known state +streaming/fdv2/ignores model version +streaming/fdv2/can discard partial events on errors \ No newline at end of file diff --git a/packages/shared/common/__tests__/internal/fdv2/PayloadReader.test.ts b/packages/shared/common/__tests__/internal/fdv2/PayloadReader.test.ts index 2212062121..effa9bcd0a 100644 --- a/packages/shared/common/__tests__/internal/fdv2/PayloadReader.test.ts +++ b/packages/shared/common/__tests__/internal/fdv2/PayloadReader.test.ts @@ -57,6 +57,48 @@ it('it sets basis to false when intent code is xfer-changes', () => { expect(receivedPayloads[0].basis).toEqual(false); }); +it('it handles xfer-full then xfer-changes', () => { + const mockStream = new MockEventStream(); + const receivedPayloads: Payload[] = []; + const readerUnderTest = new PayloadReader(mockStream, { + mockKind: (it) => it, // obj processor that just returns the same obj + }); + readerUnderTest.addPayloadListener((it) => { + receivedPayloads.push(it); + }); + + mockStream.simulateEvent('server-intent', { + data: '{"payloads": [{"code": "xfer-full", "id": "mockId"}]}', + }); + mockStream.simulateEvent('put-object', { + data: '{"kind": "mockKind", "key": "flagA", "version": 123, "object": {"objectFieldA": "objectValueA"}}', + }); + mockStream.simulateEvent('payload-transferred', { + data: '{"state": "mockState", "version": 1}', + }); + + mockStream.simulateEvent('put-object', { + data: '{"kind": "mockKind", "key": "flagA", "version": 456, "object": {"objectFieldA": "newValue"}}', + }); + mockStream.simulateEvent('payload-transferred', { + data: '{"state": "mockState", "version": 1}', + }); + expect(receivedPayloads.length).toEqual(2); + expect(receivedPayloads[0].id).toEqual('mockId'); + expect(receivedPayloads[0].state).toEqual('mockState'); + expect(receivedPayloads[0].basis).toEqual(true); + expect(receivedPayloads[0].updates.length).toEqual(1); + expect(receivedPayloads[0].updates[0].object).toEqual({ objectFieldA: 'objectValueA' }); + expect(receivedPayloads[0].updates[0].deleted).toEqual(undefined); + + expect(receivedPayloads[1].id).toEqual('mockId'); + expect(receivedPayloads[1].state).toEqual('mockState'); + expect(receivedPayloads[1].basis).toEqual(false); + expect(receivedPayloads[1].updates.length).toEqual(1); + expect(receivedPayloads[1].updates[0].object).toEqual({ objectFieldA: 'newValue' }); + expect(receivedPayloads[1].updates[0].deleted).toEqual(undefined); +}); + it('it includes multiple types of updates in payload', () => { const mockStream = new MockEventStream(); const receivedPayloads: Payload[] = []; @@ -95,7 +137,7 @@ it('it includes multiple types of updates in payload', () => { expect(receivedPayloads[0].updates[2].deleted).toEqual(undefined); }); -it('it does not include messages thats are not between server-intent and payloader-transferred', () => { +it('it does not include messages thats are not between server-intent and payload-transferred', () => { const mockStream = new MockEventStream(); const receivedPayloads: Payload[] = []; const readerUnderTest = new PayloadReader(mockStream, { @@ -183,12 +225,15 @@ it('logs prescribed message when error event is encountered', () => { mockStream.simulateEvent('error', { data: '{"reason": "Womp womp"}', }); + mockStream.simulateEvent('put-object', { + data: '{"kind": "mockKind", "key": "flagB", "version": 123, "object": {"objectFieldB": "objectValueB"}}', + }); mockStream.simulateEvent('payload-transferred', { data: '{"state": "mockState", "version": 1}', }); - expect(receivedPayloads.length).toEqual(0); + expect(receivedPayloads.length).toEqual(1); expect(mockLogger.info).toHaveBeenCalledWith( - 'An issue was encountered receiving updates for payload mockId with reason: Womp womp. Automatic retry will occur.', + 'An issue was encountered receiving updates for payload mockId with reason: Womp womp.', ); }); @@ -222,6 +267,9 @@ it('discards partially transferred data when an error is encountered', () => { mockStream.simulateEvent('error', { data: '{"reason": "Womp womp"}', }); + mockStream.simulateEvent('put-object', { + data: '{"kind": "mockKind", "key": "flagB", "version": 123, "object": {"objectFieldB": "objectValueB"}}', + }); mockStream.simulateEvent('payload-transferred', { data: '{"state": "mockState", "version": 1}', }); @@ -240,17 +288,23 @@ it('discards partially transferred data when an error is encountered', () => { mockStream.simulateEvent('payload-transferred', { data: '{"state": "mockState2", "version": 1}', }); - expect(receivedPayloads.length).toEqual(1); - expect(receivedPayloads[0].id).toEqual('mockId2'); - expect(receivedPayloads[0].state).toEqual('mockState2'); + expect(receivedPayloads.length).toEqual(2); + expect(receivedPayloads[0].id).toEqual('mockId'); + expect(receivedPayloads[0].state).toEqual('mockState'); expect(receivedPayloads[0].basis).toEqual(true); - expect(receivedPayloads[0].updates.length).toEqual(3); - expect(receivedPayloads[0].updates[0].object).toEqual({ objectFieldX: 'objectValueX' }); + expect(receivedPayloads[0].updates.length).toEqual(1); + expect(receivedPayloads[0].updates[0].object).toEqual({ objectFieldB: 'objectValueB' }); expect(receivedPayloads[0].updates[0].deleted).toEqual(undefined); - expect(receivedPayloads[0].updates[1].object).toEqual(undefined); - expect(receivedPayloads[0].updates[1].deleted).toEqual(true); - expect(receivedPayloads[0].updates[2].object).toEqual({ objectFieldZ: 'objectValueZ' }); - expect(receivedPayloads[0].updates[2].deleted).toEqual(undefined); + expect(receivedPayloads[1].id).toEqual('mockId2'); + expect(receivedPayloads[1].state).toEqual('mockState2'); + expect(receivedPayloads[1].basis).toEqual(true); + expect(receivedPayloads[1].updates.length).toEqual(3); + expect(receivedPayloads[1].updates[0].object).toEqual({ objectFieldX: 'objectValueX' }); + expect(receivedPayloads[1].updates[0].deleted).toEqual(undefined); + expect(receivedPayloads[1].updates[1].object).toEqual(undefined); + expect(receivedPayloads[1].updates[1].deleted).toEqual(true); + expect(receivedPayloads[1].updates[2].object).toEqual({ objectFieldZ: 'objectValueZ' }); + expect(receivedPayloads[1].updates[2].deleted).toEqual(undefined); }); it('silently ignores unrecognized kinds', () => { diff --git a/packages/shared/common/src/internal/fdv2/payloadReader.ts b/packages/shared/common/src/internal/fdv2/payloadReader.ts index 68b1320fc3..dcd86208c5 100644 --- a/packages/shared/common/src/internal/fdv2/payloadReader.ts +++ b/packages/shared/common/src/internal/fdv2/payloadReader.ts @@ -44,7 +44,7 @@ export class PayloadReader { private _listeners: PayloadListener[] = []; private _tempId?: string = undefined; - private _tempBasis?: boolean = undefined; + private _tempBasis: boolean = false; private _tempUpdates: Update[] = []; /** @@ -105,7 +105,7 @@ export class PayloadReader { private _processServerIntent = (data: ServerIntentData) => { // clear state in prep for handling data - this._resetState(); + this._resetAll(); // if there's no payloads, return if (!data.payloads.length) { @@ -133,7 +133,7 @@ export class PayloadReader { private _processPutObject = (data: PutObject) => { // if the following properties haven't been provided by now, we should ignore the event if ( - !this._tempId || // server intent hasn't been recieved yet. + !this._tempId || // server intent hasn't been received yet. !data.kind || !data.key || !data.version || @@ -144,7 +144,7 @@ export class PayloadReader { const obj = this._processObj(data.kind, data.object); if (!obj) { - this._logger?.warn(`Unable to prcoess object for kind: '${data.kind}'`); + this._logger?.warn(`Unable to process object for kind: '${data.kind}'`); // ignore unrecognized kinds return; } @@ -176,12 +176,11 @@ export class PayloadReader { private _processPayloadTransferred = (data: PayloadTransferred) => { // if the following properties haven't been provided by now, we should reset if ( - !this._tempId || // server intent hasn't been recieved yet. + !this._tempId || // server intent hasn't been received yet. !data.state || - !data.version || - this._tempBasis === undefined + !data.version ) { - this._resetState(); // a reset is best defensive action since payload transferred terminates a payload + this._resetAll(); // a reset is best defensive action since payload transferred terminates a payload return; } @@ -194,26 +193,35 @@ export class PayloadReader { }; this._listeners.forEach((it) => it(payload)); - this._resetState(); + this._resetAfterEmission(); }; private _processGoodbye = (data: any) => { this._logger?.info( `Goodbye was received from the LaunchDarkly connection with reason: ${data.reason}.`, ); - this._resetState(); + this._resetAll(); }; private _processError = (data: any) => { this._logger?.info( - `An issue was encountered receiving updates for payload ${this._tempId} with reason: ${data.reason}. Automatic retry will occur.`, + `An issue was encountered receiving updates for payload ${this._tempId} with reason: ${data.reason}.`, ); - this._resetState(); + this._resetAfterError(); }; - private _resetState() { + private _resetAfterEmission() { + this._tempBasis = false; + this._tempUpdates = []; + } + + private _resetAfterError() { + this._tempUpdates = []; + } + + private _resetAll() { this._tempId = undefined; - this._tempBasis = undefined; + this._tempBasis = false; this._tempUpdates = []; } } diff --git a/packages/shared/sdk-server/src/LDClientImpl.ts b/packages/shared/sdk-server/src/LDClientImpl.ts index f149306e48..74f4df9077 100644 --- a/packages/shared/sdk-server/src/LDClientImpl.ts +++ b/packages/shared/sdk-server/src/LDClientImpl.ts @@ -2,6 +2,7 @@ import { cancelableTimedPromise, ClientContext, + CompositeDataSource, Context, defaultHeaders, internal, @@ -37,11 +38,13 @@ import { } from './api/options/LDDataSystemOptions'; import BigSegmentsManager from './BigSegmentsManager'; import BigSegmentStoreStatusProvider from './BigSegmentStatusProviderImpl'; +import { createPayloadListener } from './data_sources/createPayloadListenerFDv2'; import { createStreamListeners } from './data_sources/createStreamListeners'; import DataSourceUpdates from './data_sources/DataSourceUpdates'; +import OneShotInitializer from './data_sources/OneShotInitializer'; import PollingProcessor from './data_sources/PollingProcessor'; import Requestor from './data_sources/Requestor'; -import StreamingProcessor from './data_sources/StreamingProcessor'; +import StreamingProcessorFDv2 from './data_sources/StreamingProcessorFDv2'; import createDiagnosticsInitConfig from './diagnostics/createDiagnosticsInitConfig'; import { allAsync } from './evaluation/collection'; import { Flag } from './evaluation/data/Flag'; @@ -102,6 +105,8 @@ export default class LDClientImpl implements LDClient { private _updateProcessor?: subsystem.LDStreamProcessor; + private _dataSource?: subsystem.DataSource; + private _eventFactoryDefault = new EventFactory(false); private _eventFactoryWithReasons = new EventFactory(true); @@ -221,50 +226,73 @@ export default class LDClientImpl implements LDClient { }; this._evaluator = new Evaluator(this._platform, queries); - const listeners = createStreamListeners(dataSourceUpdates, this._logger, { - put: () => this._initSuccess(), - }); - const makeDefaultProcessor = () => { - if (isPollingOnlyOptions(config.dataSystem.dataSource)) { - return new PollingProcessor( - new Requestor(config, this._platform.requests, baseHeaders), - config.dataSystem.dataSource.pollInterval ?? 30, - dataSourceUpdates, - config.logger, - () => this._initSuccess(), - (e) => this._dataSourceErrorHandler(e), - ); - } - // TODO: SDK-858 Hook up composite data source and config - const reconnectDelay = - isStandardOptions(config.dataSystem.dataSource) || - isStreamingOnlyOptions(config.dataSystem.dataSource) - ? config.dataSystem.dataSource.streamInitialReconnectDelay - : 1; - return new StreamingProcessor( + if (!(config.offline || config.dataSystem.useLdd)) { + // use configured update processor factory if one exists + const updateProcessor = config.dataSystem.updateProcessorFactory?.( clientContext, - '/all', - [], - listeners, - baseHeaders, - this._diagnosticsManager, + dataSourceUpdates, + () => this._initSuccess(), (e) => this._dataSourceErrorHandler(e), - reconnectDelay, ); - }; + if (updateProcessor) { + this._updateProcessor = updateProcessor; + this._updateProcessor?.start(); + } else { + // make the FDv2 composite datasource with initializers/synchronizers + let initializers: subsystem.LDSynchronizerFactory[] = []; + if (isStandardOptions(config.dataSystem.dataSource)) { + initializers = [ + () => + new OneShotInitializer( + new Requestor(config, this._platform.requests, baseHeaders), + config.logger, + ), + ]; + } else { + initializers = []; + } - if (!(config.offline || config.dataSystem.useLdd)) { - this._updateProcessor = - config.dataSystem.updateProcessorFactory?.( - clientContext, - dataSourceUpdates, - () => this._initSuccess(), - (e) => this._dataSourceErrorHandler(e), - ) ?? makeDefaultProcessor(); - } + let synchronizers: subsystem.LDSynchronizerFactory[] = []; + if (isPollingOnlyOptions(config.dataSystem.dataSource)) { + // TODO: SDK-851 - Make polling synchronizer + synchronizers = []; + } else if ( + isStandardOptions(config.dataSystem.dataSource) || + isStreamingOnlyOptions(config.dataSystem.dataSource) + ) { + const reconnectDelay = config.dataSystem.dataSource.streamInitialReconnectDelay; + synchronizers = [ + () => + new StreamingProcessorFDv2( + clientContext, + '/all', + [], + baseHeaders, + this._diagnosticsManager, + reconnectDelay, + ), + ]; + } else { + // TODO: this is an interesting case to be figured out later + synchronizers = []; + } + + this._dataSource = new CompositeDataSource(initializers, synchronizers, this.logger); + const payloadListener = createPayloadListener(dataSourceUpdates, this.logger, () => { + this._initSuccess(); + }); - if (this._updateProcessor) { - this._updateProcessor.start(); + this._dataSource.start( + (_, payload) => { + payloadListener(payload); + }, + (_, err) => { + if (err) { + this._dataSourceErrorHandler(err); + } + }, + ); + } } else { // Deferring the start callback should allow client construction to complete before we start // emitting events. Allowing the client an opportunity to register events. @@ -285,7 +313,10 @@ export default class LDClientImpl implements LDClient { // If there is no update processor, then there is functionally no initialization // so it is fine not to wait. - if (options?.timeout === undefined && this._updateProcessor !== undefined) { + if ( + options?.timeout === undefined && + (this._updateProcessor !== undefined || this._dataSource !== undefined) + ) { this._logger?.warn( 'The waitForInitialization function was called without a timeout specified.' + ' In a future version a default timeout will be applied.', @@ -294,7 +325,7 @@ export default class LDClientImpl implements LDClient { if ( options?.timeout !== undefined && options?.timeout > HIGH_TIMEOUT_THRESHOLD && - this._updateProcessor !== undefined + (this._updateProcessor !== undefined || this._dataSource !== undefined) ) { this._logger?.warn( 'The waitForInitialization function was called with a timeout greater than ' + @@ -740,6 +771,7 @@ export default class LDClientImpl implements LDClient { close(): void { this._eventProcessor.close(); this._updateProcessor?.close(); + this._dataSource?.stop(); this._featureStore.close(); this._bigSegmentsManager.close(); } diff --git a/packages/shared/sdk-server/src/data_sources/OneShotInitializer.ts b/packages/shared/sdk-server/src/data_sources/OneShotInitializer.ts new file mode 100644 index 0000000000..c1fd78de8c --- /dev/null +++ b/packages/shared/sdk-server/src/data_sources/OneShotInitializer.ts @@ -0,0 +1,90 @@ +import { + DataSourceErrorKind, + httpErrorMessage, + LDLogger, + LDPollingError, + subsystem as subsystemCommon, +} from '@launchdarkly/js-sdk-common'; + +import { deserializePoll } from '../store'; +import VersionedDataKinds from '../store/VersionedDataKinds'; +import Requestor from './Requestor'; + +/** + * @internal + */ +export default class OneShotInitializer implements subsystemCommon.DataSystemInitializer { + constructor( + private readonly _requestor: Requestor, + private readonly _logger?: LDLogger, + ) {} + + /** + * May be called any number of times, if already started, has no effect + * @param dataCallback that will be called when data arrives, may be called multiple times. + * @param statusCallback that will be called when data source state changes or an unrecoverable error + * has been encountered. + */ + start( + dataCallback: (basis: boolean, data: any) => void, + statusCallback: (status: subsystemCommon.DataSourceState, err?: any) => void, + ) { + statusCallback(subsystemCommon.DataSourceState.Initializing); + + // @ts-ignore + // eslint-disable-next-line no-underscore-dangle + console.log(this._requestor._headers); + + this._logger?.debug('Performing initialization request to LaunchDarkly for feature flag data.'); + this._requestor.requestAllData((err, body) => { + if (err) { + const { status } = err; + const message = httpErrorMessage(err, 'initializer', 'will not retry'); + this._logger?.error(message); + statusCallback( + subsystemCommon.DataSourceState.Closed, + new LDPollingError(DataSourceErrorKind.ErrorResponse, message, status), + ); + return; + } + + if (!body) { + this._logger?.error('Initialization response missing body'); + statusCallback( + subsystemCommon.DataSourceState.Closed, + new LDPollingError(DataSourceErrorKind.InvalidData, 'Polling response missing body'), + ); + } + + const parsed = deserializePoll(body); + if (!parsed) { + // We could not parse this JSON. Report the problem and fallthrough to + // start another poll. + this._logger?.error('Initialization response contained invalid data'); + this._logger?.debug(`Invalid JSON follows: ${body}`); + statusCallback( + subsystemCommon.DataSourceState.Closed, + new LDPollingError( + DataSourceErrorKind.InvalidData, + 'Malformed JSON data in polling response', + ), + ); + } else { + const initData = { + [VersionedDataKinds.Features.namespace]: parsed.flags, + [VersionedDataKinds.Segments.namespace]: parsed.segments, + }; + + // TODO: need to transform this into a payload + + dataCallback(true, initData); + statusCallback(subsystemCommon.DataSourceState.Closed); + } + }); + } + + stop() { + // TODO: at the moment no way to cancel the inflight request via the requester API, but could + // be added in the future. + } +} diff --git a/packages/shared/sdk-server/src/data_sources/StreamingProcessorFDv2.ts b/packages/shared/sdk-server/src/data_sources/StreamingProcessorFDv2.ts index 901c3d3ec7..d7715f9a7a 100644 --- a/packages/shared/sdk-server/src/data_sources/StreamingProcessorFDv2.ts +++ b/packages/shared/sdk-server/src/data_sources/StreamingProcessorFDv2.ts @@ -12,7 +12,7 @@ import { Requests, shouldRetry, StreamingErrorHandler, - subsystem, + subsystem as subsystemCommon, } from '@launchdarkly/js-sdk-common'; import { PayloadListener } from '@launchdarkly/js-sdk-common/dist/esm/internal'; @@ -21,7 +21,7 @@ import { Segment } from '../evaluation/data/Segment'; import { processFlag, processSegment } from '../store/serialization'; // TODO: consider naming this StreamingDatasource -export default class StreamingProcessorFDv2 implements subsystem.LDStreamProcessor { +export default class StreamingProcessorFDv2 implements subsystemCommon.DataSystemSynchronizer { private readonly _headers: { [key: string]: string | string[] }; private readonly _streamUri: string; private readonly _logger?: LDLogger; @@ -34,10 +34,8 @@ export default class StreamingProcessorFDv2 implements subsystem.LDStreamProcess clientContext: ClientContext, streamUriPath: string, parameters: { key: string; value: string }[], - private readonly _payloadListener: PayloadListener, baseHeaders: LDHeaders, private readonly _diagnosticsManager?: internal.DiagnosticsManager, - private readonly _errorHandler?: StreamingErrorHandler, private readonly _streamInitialReconnectDelay = 1, ) { const { basicConfiguration, platform } = clientContext; @@ -54,7 +52,7 @@ export default class StreamingProcessorFDv2 implements subsystem.LDStreamProcess ); } - private _logConnectionStarted() { + private _logConnectionAttempt() { this._connectionAttemptStartTime = Date.now(); } @@ -79,28 +77,32 @@ export default class StreamingProcessorFDv2 implements subsystem.LDStreamProcess * * @private */ - private _retryAndHandleError(err: HttpErrorResponse) { + private _retryAndHandleError(err: HttpErrorResponse, statusCallback: (status: subsystemCommon.DataSourceState, err?: any) => void) { + if (!shouldRetry(err)) { - this._logConnectionResult(false); - this._errorHandler?.( - new LDStreamingError(DataSourceErrorKind.ErrorResponse, err.message, err.status), - ); this._logger?.error(httpErrorMessage(err, 'streaming request')); + this._logConnectionResult(false); + statusCallback(subsystemCommon.DataSourceState.Closed, new LDStreamingError(DataSourceErrorKind.ErrorResponse, err.message, err.status)) return false; } this._logger?.warn(httpErrorMessage(err, 'streaming request', 'will retry')); this._logConnectionResult(false); - this._logConnectionStarted(); + this._logConnectionAttempt(); + statusCallback(subsystemCommon.DataSourceState.Interrupted) return true; } - start() { - this._logConnectionStarted(); + start( + dataCallback: (basis: boolean, data: any) => void, + statusCallback: (status: subsystemCommon.DataSourceState, err?: any) => void, + ) { + this._logConnectionAttempt(); + statusCallback(subsystemCommon.DataSourceState.Initializing); const eventSource = this._requests.createEventSource(this._streamUri, { headers: this._headers, - errorFilter: (error: HttpErrorResponse) => this._retryAndHandleError(error), + errorFilter: (error: HttpErrorResponse) => this._retryAndHandleError(error, statusCallback), initialRetryDelayMillis: 1000 * this._streamInitialReconnectDelay, readTimeoutMillis: 5 * 60 * 1000, retryResetIntervalMillis: 60 * 1000, @@ -119,7 +121,7 @@ export default class StreamingProcessorFDv2 implements subsystem.LDStreamProcess }, }, (errorKind: DataSourceErrorKind, message: string) => { - this._errorHandler?.(new LDStreamingError(errorKind, message)); + statusCallback(subsystemCommon.DataSourceState.Interrupted, new LDStreamingError(errorKind, message)); }, this._logger, ); @@ -128,10 +130,13 @@ export default class StreamingProcessorFDv2 implements subsystem.LDStreamProcess // to double check the handling in the ServerIntent:none case. That may not trigger these payload listeners. this._logConnectionResult(true); }); - payloadReader.addPayloadListener(this._payloadListener); + payloadReader.addPayloadListener((payload) => { + dataCallback(payload.basis, payload); + }); eventSource.onclose = () => { this._logger?.info('Closed LaunchDarkly stream connection'); + statusCallback(subsystemCommon.DataSourceState.Closed); }; eventSource.onerror = () => { @@ -140,6 +145,7 @@ export default class StreamingProcessorFDv2 implements subsystem.LDStreamProcess eventSource.onopen = () => { this._logger?.info('Opened LaunchDarkly stream connection'); + statusCallback(subsystemCommon.DataSourceState.Valid); }; eventSource.onretrying = (e) => {