diff --git a/package.json b/package.json index 53163bd25c..200b239d2a 100644 --- a/package.json +++ b/package.json @@ -66,6 +66,7 @@ "packageManager": "yarn@3.4.1", "//": "Pin jsonc-parser because v3.3.0 breaks rollup-plugin-esbuild", "resolutions": { - "jsonc-parser": "3.2.0" + "jsonc-parser": "3.2.0", + "parse5": "7.2.1" } } diff --git a/packages/sdk/server-node/__tests__/LDClientNode.proxy.test.ts b/packages/sdk/server-node/__tests__/LDClientNode.proxy.test.ts index 771d5beb92..b49856b909 100644 --- a/packages/sdk/server-node/__tests__/LDClientNode.proxy.test.ts +++ b/packages/sdk/server-node/__tests__/LDClientNode.proxy.test.ts @@ -38,7 +38,7 @@ describe('When using a proxy', () => { it('can use proxy in polling mode', async () => { const proxy = await TestHttpServer.startProxy(); const server = await TestHttpServer.start(); - server.forMethodAndPath('get', '/sdk/latest-all', TestHttpHandlers.respondJson(allData)); + server.forMethodAndPath('get', '/sdk/poll', TestHttpHandlers.respondJson(allData)); const client = new LDClientNode(sdkKey, { baseUri: server.url, @@ -67,7 +67,7 @@ describe('When using a proxy', () => { const server = await TestHttpServer.start(); const events = new AsyncQueue(); events.add({ type: 'put', data: JSON.stringify({ data: allData }) }); - server.forMethodAndPath('get', '/all', TestHttpHandlers.sseStream(events)); + server.forMethodAndPath('get', '/sdk/stream', TestHttpHandlers.sseStream(events)); const client = new LDClientNode(sdkKey, { streamUri: server.url, @@ -94,7 +94,7 @@ describe('When using a proxy', () => { const proxy = await TestHttpServer.startProxy(); const pollingServer = await TestHttpServer.start(); const eventsServer = await TestHttpServer.start(); - pollingServer.forMethodAndPath('get', '/sdk/latest-all', TestHttpHandlers.respondJson(allData)); + pollingServer.forMethodAndPath('get', '/sdk/poll', TestHttpHandlers.respondJson(allData)); eventsServer.forMethodAndPath('post', '/diagnostic', TestHttpHandlers.respond(200)); const client = new LDClientNode(sdkKey, { diff --git a/packages/sdk/server-node/__tests__/LDClientNode.tls.test.ts b/packages/sdk/server-node/__tests__/LDClientNode.tls.test.ts index 11454bb437..50944697a3 100644 --- a/packages/sdk/server-node/__tests__/LDClientNode.tls.test.ts +++ b/packages/sdk/server-node/__tests__/LDClientNode.tls.test.ts @@ -26,7 +26,7 @@ describe('When using a TLS connection', () => { it('can connect via HTTPS to a server with a self-signed certificate, if CA is specified', async () => { server = await TestHttpServer.startSecure(); - server.forMethodAndPath('get', '/sdk/latest-all', TestHttpHandlers.respondJson({})); + server.forMethodAndPath('get', '/sdk/poll', TestHttpHandlers.respondJson({})); client = new LDClientNode('sdk-key', { baseUri: server.url, @@ -41,7 +41,7 @@ describe('When using a TLS connection', () => { it('cannot connect via HTTPS to a server with a self-signed certificate, using default config', async () => { server = await TestHttpServer.startSecure(); - server.forMethodAndPath('get', '/sdk/latest-all', TestHttpHandlers.respondJson({})); + server.forMethodAndPath('get', '/sdk/poll', TestHttpHandlers.respondJson({})); client = new LDClientNode('sdk-key', { baseUri: server.url, @@ -64,7 +64,7 @@ describe('When using a TLS connection', () => { const events = new AsyncQueue(); events.add({ type: 'put', data: JSON.stringify(eventData) }); server = await TestHttpServer.startSecure(); - server.forMethodAndPath('get', '/stream/all', TestHttpHandlers.sseStream(events)); + server.forMethodAndPath('get', '/stream/sdk/stream', TestHttpHandlers.sseStream(events)); client = new LDClientNode('sdk-key', { baseUri: server.url, @@ -83,7 +83,7 @@ describe('When using a TLS connection', () => { it('can use custom TLS options for posting events', async () => { server = await TestHttpServer.startSecure(); server.forMethodAndPath('post', '/events/bulk', TestHttpHandlers.respond(200)); - server.forMethodAndPath('get', '/sdk/latest-all', TestHttpHandlers.respondJson({})); + server.forMethodAndPath('get', '/sdk/poll', TestHttpHandlers.respondJson({})); client = new LDClientNode('sdk-key', { baseUri: server.url, @@ -99,7 +99,7 @@ describe('When using a TLS connection', () => { await client.flush(); const flagsRequest = await server.nextRequest(); - expect(flagsRequest.path).toEqual('/sdk/latest-all'); + expect(flagsRequest.path).toEqual('/sdk/poll'); const eventsRequest = await server.nextRequest(); expect(eventsRequest.path).toEqual('/events/bulk'); diff --git a/packages/sdk/server-node/package.json b/packages/sdk/server-node/package.json index 24024688ce..374fb89744 100644 --- a/packages/sdk/server-node/package.json +++ b/packages/sdk/server-node/package.json @@ -47,7 +47,7 @@ "dependencies": { "@launchdarkly/js-server-sdk-common": "2.10.0", "https-proxy-agent": "^5.0.1", - "launchdarkly-eventsource": "2.0.3" + "launchdarkly-eventsource": "2.2.0" }, "devDependencies": { "@trivago/prettier-plugin-sort-imports": "^4.1.1", diff --git a/packages/shared/common/__tests__/internal/fdv2/PayloadStreamReader.test.ts b/packages/shared/common/__tests__/internal/fdv2/PayloadStreamReader.test.ts index ba7a951292..42b65b5531 100644 --- a/packages/shared/common/__tests__/internal/fdv2/PayloadStreamReader.test.ts +++ b/packages/shared/common/__tests__/internal/fdv2/PayloadStreamReader.test.ts @@ -2,7 +2,6 @@ import { EventListener, EventName, LDLogger } from '../../../src/api'; import { Payload } from '../../../src/internal/fdv2/payloadProcessor'; import { EventStream, PayloadStreamReader } from '../../../src/internal/fdv2/payloadStreamReader'; - class MockEventStream implements EventStream { private _listeners: Record = {}; @@ -26,7 +25,7 @@ it('it sets basis to true when intent code is xfer-full', () => { }); mockStream.simulateEvent('server-intent', { - data: '{"payloads": [{"code": "xfer-full", "id": "mockId"}]}', + data: '{"payloads": [{"intentCode": "xfer-full", "id": "mockId"}]}', }); mockStream.simulateEvent('payload-transferred', { data: '{"state": "mockState", "version": 1}', @@ -48,7 +47,7 @@ it('it sets basis to false when intent code is xfer-changes', () => { }); mockStream.simulateEvent('server-intent', { - data: '{"payloads": [{"code": "xfer-changes", "id": "mockId"}]}', + data: '{"payloads": [{"intentCode": "xfer-changes", "id": "mockId"}]}', }); mockStream.simulateEvent('payload-transferred', { data: '{"state": "mockState", "version": 1}', @@ -70,7 +69,7 @@ it('it sets basis to false and emits empty payload when intent code is none', () }); mockStream.simulateEvent('server-intent', { - data: '{"payloads": [{"code": "none", "id": "mockId", "target": 42}]}', + data: '{"payloads": [{"intentCode": "none", "id": "mockId", "target": 42}]}', }); expect(receivedPayloads.length).toEqual(1); expect(receivedPayloads[0].id).toEqual('mockId'); @@ -89,7 +88,7 @@ it('it handles xfer-full then xfer-changes', () => { }); mockStream.simulateEvent('server-intent', { - data: '{"payloads": [{"code": "xfer-full", "id": "mockId"}]}', + data: '{"payloads": [{"intentCode": "xfer-full", "id": "mockId"}]}', }); mockStream.simulateEvent('put-object', { data: '{"kind": "mockKind", "key": "flagA", "version": 123, "object": {"objectFieldA": "objectValueA"}}', @@ -131,7 +130,7 @@ it('it includes multiple types of updates in payload', () => { }); mockStream.simulateEvent('server-intent', { - data: '{"payloads": [{"code": "xfer-full", "id": "mockId"}]}', + data: '{"payloads": [{"intentCode": "xfer-full", "id": "mockId"}]}', }); mockStream.simulateEvent('put-object', { data: '{"kind": "mockKind", "key": "flagA", "version": 123, "object": {"objectFieldA": "objectValueA"}}', @@ -172,7 +171,7 @@ it('it does not include messages thats are not between server-intent and payload data: '{"kind": "mockKind", "key": "flagShouldIgnore", "version": 123, "object": {"objectFieldShouldIgnore": "objectValueShouldIgnore"}}', }); mockStream.simulateEvent('server-intent', { - data: '{"payloads": [{"code": "xfer-full", "id": "mockId"}]}', + data: '{"payloads": [{"intentCode": "xfer-full", "id": "mockId"}]}', }); mockStream.simulateEvent('put-object', { data: '{"kind": "mockKind", "key": "flagA", "version": 123, "object": {"objectFieldA": "objectValueA"}}', @@ -238,7 +237,7 @@ it('logs prescribed message when error event is encountered', () => { }); mockStream.simulateEvent('server-intent', { - data: '{"payloads": [{"code": "xfer-full", "id": "mockId"}]}', + data: '{"payloads": [{"intentCode": "xfer-full", "id": "mockId"}]}', }); mockStream.simulateEvent('put-object', { data: '{"kind": "mockKind", "key": "flagA", "version": 123, "object": {"objectFieldA": "objectValueA"}}', @@ -280,7 +279,7 @@ it('discards partially transferred data when an error is encountered', () => { }); mockStream.simulateEvent('server-intent', { - data: '{"payloads": [{"code": "xfer-full", "id": "mockId"}]}', + data: '{"payloads": [{"intentCode": "xfer-full", "id": "mockId"}]}', }); mockStream.simulateEvent('put-object', { data: '{"kind": "mockKind", "key": "flagA", "version": 123, "object": {"objectFieldA": "objectValueA"}}', @@ -295,7 +294,7 @@ it('discards partially transferred data when an error is encountered', () => { data: '{"state": "mockState", "version": 1}', }); mockStream.simulateEvent('server-intent', { - data: '{"payloads": [{"code": "xfer-full", "id": "mockId2"}]}', + data: '{"payloads": [{"intentCode": "xfer-full", "id": "mockId2"}]}', }); mockStream.simulateEvent('put-object', { data: '{"kind": "mockKind", "key": "flagX", "version": 123, "object": {"objectFieldX": "objectValueX"}}', @@ -339,7 +338,7 @@ it('silently ignores unrecognized kinds', () => { }); mockStream.simulateEvent('server-intent', { - data: '{"payloads": [{"code": "xfer-full", "id": "mockId"}]}', + data: '{"payloads": [{"intentCode": "xfer-full", "id": "mockId"}]}', }); mockStream.simulateEvent('put-object', { data: '{"kind": "mockKind", "key": "flagA", "version": 123, "object": {"objectFieldA": "objectValueA"}}', @@ -369,7 +368,7 @@ it('ignores additional payloads beyond the first payload in the server-intent me }); mockStream.simulateEvent('server-intent', { - data: '{"payloads": [{"code": "xfer-full", "id": "mockId"},{"code": "IShouldBeIgnored", "id": "IShouldBeIgnored"}]}', + data: '{"payloads": [{"intentCode": "xfer-full", "id": "mockId"},{"intentCode": "IShouldBeIgnored", "id": "IShouldBeIgnored"}]}', }); mockStream.simulateEvent('put-object', { data: '{"kind": "mockKind", "key": "flagA", "version": 123, "object": {"objectFieldA": "objectValueA"}}', diff --git a/packages/shared/common/__tests__/options/ServiceEndpoints.test.ts b/packages/shared/common/__tests__/options/ServiceEndpoints.test.ts index c08d09f8d1..30d7f39802 100644 --- a/packages/shared/common/__tests__/options/ServiceEndpoints.test.ts +++ b/packages/shared/common/__tests__/options/ServiceEndpoints.test.ts @@ -49,14 +49,14 @@ it('applies payload filter to polling and streaming endpoints', () => { 'filterKey', ); - expect(getStreamingUri(endpoints, '/all', [])).toEqual( - 'https://stream.launchdarkly.com/all?filter=filterKey', + expect(getStreamingUri(endpoints, '/sdk/stream', [])).toEqual( + 'https://stream.launchdarkly.com/sdk/stream?filter=filterKey', ); - expect(getPollingUri(endpoints, '/sdk/latest-all', [])).toEqual( - 'https://sdk.launchdarkly.com/sdk/latest-all?filter=filterKey', + expect(getPollingUri(endpoints, '/sdk/poll', [])).toEqual( + 'https://sdk.launchdarkly.com/sdk/poll?filter=filterKey', ); expect( - getPollingUri(endpoints, '/sdk/latest-all', [{ key: 'withReasons', value: 'true' }]), - ).toEqual('https://sdk.launchdarkly.com/sdk/latest-all?withReasons=true&filter=filterKey'); + getPollingUri(endpoints, '/sdk/poll', [{ key: 'withReasons', value: 'true' }]), + ).toEqual('https://sdk.launchdarkly.com/sdk/poll?withReasons=true&filter=filterKey'); expect(getEventsUri(endpoints, '/bulk', [])).toEqual('https://events.launchdarkly.com/bulk'); }); diff --git a/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts b/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts index d87e7d3c8b..e7da96118f 100644 --- a/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts +++ b/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts @@ -8,6 +8,8 @@ import { CompositeDataSource, TransitionConditions, } from '../../../src/datasource/CompositeDataSource'; +import { DataSourceErrorKind } from '../../../src/datasource/DataSourceErrorKinds'; +import { LDFlagDeliveryFallbackError } from '../../../src/datasource/errors'; function makeDataSourceFactory(internal: DataSource): LDDataSourceFactory { return () => internal; @@ -75,6 +77,7 @@ it('handles initializer getting basis, switching to synchronizer', async () => { const underTest = new CompositeDataSource( [makeDataSourceFactory(mockInitializer1)], [makeDataSourceFactory(mockSynchronizer1)], + [], undefined, makeTestTransitionConditions(), makeZeroBackoff(), @@ -169,6 +172,7 @@ it('handles initializer getting basis, switches to synchronizer 1, falls back to const underTest = new CompositeDataSource( [makeDataSourceFactory(mockInitializer1)], [makeDataSourceFactory(mockSynchronizer1), makeDataSourceFactory(mockSynchronizer2)], + [], undefined, makeTestTransitionConditions(), makeZeroBackoff(), @@ -270,6 +274,7 @@ it('removes synchronizer that reports unrecoverable error and loops on remaining const underTest = new CompositeDataSource( [makeDataSourceFactory(mockInitializer1)], [makeDataSourceFactory(mockSynchronizer1), makeDataSourceFactory(mockSynchronizer2)], + [], undefined, makeTestTransitionConditions(), makeZeroBackoff(), @@ -306,6 +311,117 @@ it('removes synchronizer that reports unrecoverable error and loops on remaining expect(statusCallback).toHaveBeenNthCalledWith(10, DataSourceState.Valid, undefined); // sync1 valid }); +it('falls back to FDv1 synchronizers when FDv1 fallback error is reported', async () => { + const mockInitializer1: DataSource = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Valid); + _dataCallback(true, { key: 'init1' }); + _statusCallback(DataSourceState.Closed); + }, + ), + stop: jest.fn(), + }; + + const mockSynchronizer1 = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback( + DataSourceState.Closed, + new LDFlagDeliveryFallbackError( + DataSourceErrorKind.ErrorResponse, + `Response header indicates to fallback to FDv1`, + 403, + ), + ); + }, + ), + stop: jest.fn(), + }; + + const mockSynchronizer2 = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Closed, { + name: 'Error', + message: 'I should NOT be called due to FDv1 Fallback', + }); + }, + ), + stop: jest.fn(), + }; + + const mockFDv1Data = { key: 'FDv1Data' }; + const mockFDv1Synchronizer = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Valid, null); // this should lead to recovery + _dataCallback(false, mockFDv1Data); + }, + ), + stop: jest.fn(), + }; + + const underTest = new CompositeDataSource( + [makeDataSourceFactory(mockInitializer1)], + [makeDataSourceFactory(mockSynchronizer1), makeDataSourceFactory(mockSynchronizer2)], + [makeDataSourceFactory(mockFDv1Synchronizer)], + undefined, + makeTestTransitionConditions(), + makeZeroBackoff(), + ); + + let dataCallback; + const statusCallback = jest.fn(); + await new Promise((resolve) => { + dataCallback = jest.fn((_: boolean, data: any) => { + if (data === mockFDv1Data) { + resolve(); + } + }); + + underTest.start(dataCallback, statusCallback); + }); + + expect(mockInitializer1.start).toHaveBeenCalledTimes(1); + expect(mockSynchronizer1.start).toHaveBeenCalledTimes(1); + expect(mockSynchronizer2.start).toHaveBeenCalledTimes(0); // this synchronizer should not be called because we fall back to FDv1 synchronizers instead + expect(mockFDv1Synchronizer.start).toHaveBeenCalledTimes(1); + expect(dataCallback).toHaveBeenCalledTimes(2); + expect(dataCallback).toHaveBeenNthCalledWith(1, true, { key: 'init1' }); + expect(dataCallback).toHaveBeenNthCalledWith(2, false, { key: 'FDv1Data' }); + expect(statusCallback).toHaveBeenCalledTimes(5); + expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Initializing, undefined); + expect(statusCallback).toHaveBeenNthCalledWith(2, DataSourceState.Valid, undefined); // initializer got data + expect(statusCallback).toHaveBeenNthCalledWith(3, DataSourceState.Interrupted, undefined); // initializer closed + expect(statusCallback).toHaveBeenNthCalledWith(4, DataSourceState.Interrupted, expect.anything()); // sync1 fdv1 fallback error + expect(statusCallback).toHaveBeenNthCalledWith(5, DataSourceState.Valid, undefined); // sync1 valid +}); + it('reports error when all initializers fail', async () => { const mockInitializer1Error = { name: 'Error', @@ -348,6 +464,7 @@ it('reports error when all initializers fail', async () => { const underTest = new CompositeDataSource( [makeDataSourceFactory(mockInitializer1), makeDataSourceFactory(mockInitializer2)], [], // no synchronizers for this test + [], undefined, makeTestTransitionConditions(), makeZeroBackoff(), @@ -445,6 +562,7 @@ it('it reports DataSourceState Closed when all synchronizers report Closed with const underTest = new CompositeDataSource( [makeDataSourceFactory(mockInitializer1)], [makeDataSourceFactory(mockSynchronizer1), makeDataSourceFactory(mockSynchronizer2)], + [], undefined, makeTestTransitionConditions(), makeZeroBackoff(), @@ -508,6 +626,7 @@ it('can be stopped when in thrashing synchronizer fallback loop', async () => { const underTest = new CompositeDataSource( [makeDataSourceFactory(mockInitializer1)], [makeDataSourceFactory(mockSynchronizer1)], // will continuously fallback onto itself + [], undefined, makeTestTransitionConditions(), makeZeroBackoff(), @@ -579,6 +698,7 @@ it('can be stopped and restarted', async () => { const underTest = new CompositeDataSource( [makeDataSourceFactory(mockInitializer1)], [makeDataSourceFactory(mockSynchronizer1)], + [], undefined, makeTestTransitionConditions(), makeZeroBackoff(), @@ -625,6 +745,7 @@ it('can be stopped and restarted', async () => { it('is well behaved with no initializers and no synchronizers configured', async () => { const underTest = new CompositeDataSource( + [], [], [], undefined, @@ -671,6 +792,7 @@ it('is well behaved with no initializer and synchronizer configured', async () = const underTest = new CompositeDataSource( [], [makeDataSourceFactory(mockSynchronizer1)], + [], undefined, makeTestTransitionConditions(), makeZeroBackoff(), @@ -712,6 +834,7 @@ it('is well behaved with an initializer and no synchronizers configured', async const underTest = new CompositeDataSource( [makeDataSourceFactory(mockInitializer1)], [], + [], undefined, makeTestTransitionConditions(), makeZeroBackoff(), @@ -774,6 +897,7 @@ it('consumes cancellation tokens correctly', async () => { const underTest = new CompositeDataSource( [makeDataSourceFactory(mockInitializer1)], [makeDataSourceFactory(mockSynchronizer1)], + [], undefined, { // pass in transition condition so that it will thrash, generating cancellation tokens repeatedly diff --git a/packages/shared/common/src/api/platform/EventSource.ts b/packages/shared/common/src/api/platform/EventSource.ts index f44fc830b2..982171c2f3 100644 --- a/packages/shared/common/src/api/platform/EventSource.ts +++ b/packages/shared/common/src/api/platform/EventSource.ts @@ -10,7 +10,7 @@ export type ProcessStreamResponse = { export interface EventSource { onclose: (() => void) | undefined; onerror: ((err?: HttpErrorResponse) => void) | undefined; - onopen: (() => void) | undefined; + onopen: ((e: { headers?: { [key: string]: string } }) => void) | undefined; onretrying: ((e: { delayMillis: number }) => void) | undefined; addEventListener(type: EventName, listener: EventListener): void; diff --git a/packages/shared/common/src/api/platform/Requests.ts b/packages/shared/common/src/api/platform/Requests.ts index 8b0438d20f..c4e98fe816 100644 --- a/packages/shared/common/src/api/platform/Requests.ts +++ b/packages/shared/common/src/api/platform/Requests.ts @@ -129,4 +129,5 @@ export interface Requests { export interface HttpErrorResponse { message: string; status?: number; + headers?: Record; } diff --git a/packages/shared/common/src/datasource/CompositeDataSource.ts b/packages/shared/common/src/datasource/CompositeDataSource.ts index e8eb4d0e42..bd9033687b 100644 --- a/packages/shared/common/src/datasource/CompositeDataSource.ts +++ b/packages/shared/common/src/datasource/CompositeDataSource.ts @@ -8,6 +8,7 @@ import { } from '../api/subsystem/DataSystem/DataSource'; import { Backoff, DefaultBackoff } from './Backoff'; import { DataSourceList } from './dataSourceList'; +import { LDFlagDeliveryFallbackError } from './errors'; const DEFAULT_FALLBACK_TIME_MS = 2 * 60 * 1000; const DEFAULT_RECOVERY_TIME_MS = 5 * 60 * 1000; @@ -40,6 +41,7 @@ export class CompositeDataSource implements DataSource { private _initPhaseActive: boolean; private _initFactories: DataSourceList; private _syncFactories: DataSourceList; + private _fdv1Synchronizers: DataSourceList; private _stopped: boolean = true; private _externalTransitionPromise: Promise; @@ -49,10 +51,15 @@ export class CompositeDataSource implements DataSource { /** * @param initializers factories to create {@link DataSystemInitializer}s, in priority order. * @param synchronizers factories to create {@link DataSystemSynchronizer}s, in priority order. + * @param fdv1Synchronizers factories to fallback to if we need to fallback to FDv1. + * @param _logger for logging + * @param _transitionConditions to control automated transition between datasources. Typically only used for testing. + * @param _backoff to control delay between transitions. Typically only used for testing. */ constructor( initializers: LDDataSourceFactory[], synchronizers: LDDataSourceFactory[], + fdv1Synchronizers: LDDataSourceFactory[], private readonly _logger?: LDLogger, private readonly _transitionConditions: TransitionConditions = { [DataSourceState.Valid]: { @@ -72,6 +79,7 @@ export class CompositeDataSource implements DataSource { this._initPhaseActive = initializers.length > 0; // init phase if we have initializers this._initFactories = new DataSourceList(false, initializers); this._syncFactories = new DataSourceList(true, synchronizers); + this._fdv1Synchronizers = new DataSourceList(true, fdv1Synchronizers); } async start( @@ -132,9 +140,16 @@ export class CompositeDataSource implements DataSource { ); if (err || state === DataSourceState.Closed) { callbackHandler.disable(); - if (err.recoverable === false) { + if (err?.recoverable === false) { // don't use this datasource's factory again + this._logger?.debug(`Culling data source due to err ${err}`); cullDSFactory?.(); + + // this error indicates we should fallback to only using FDv1 synchronizers + if (err instanceof LDFlagDeliveryFallbackError) { + this._logger?.debug(`Falling back to FDv1`); + this._syncFactories = this._fdv1Synchronizers; + } } sanitizedStatusCallback(state, err); this._consumeCancelToken(cancelScheduledTransition); @@ -232,6 +247,7 @@ export class CompositeDataSource implements DataSource { this._initPhaseActive = this._initFactories.length() > 0; // init phase if we have initializers; this._initFactories.reset(); this._syncFactories.reset(); + this._fdv1Synchronizers.reset(); this._externalTransitionPromise = new Promise((tr) => { this._externalTransitionResolve = tr; }); diff --git a/packages/shared/common/src/datasource/errors.ts b/packages/shared/common/src/datasource/errors.ts index f2d1f7366b..ec46e74b64 100644 --- a/packages/shared/common/src/datasource/errors.ts +++ b/packages/shared/common/src/datasource/errors.ts @@ -36,4 +36,21 @@ export class LDStreamingError extends Error { } } +/** + * This is a short term error and will be removed once FDv2 adoption is sufficient. + */ +export class LDFlagDeliveryFallbackError extends Error { + public readonly kind: DataSourceErrorKind; + public readonly code?: number; + public readonly recoverable: boolean; + + constructor(kind: DataSourceErrorKind, message: string, code?: number) { + super(message); + this.kind = kind; + this.code = code; + this.name = 'LDFlagDeliveryFallbackError'; + this.recoverable = false; + } +} + export type StreamingErrorHandler = (err: LDStreamingError) => void; diff --git a/packages/shared/common/src/datasource/index.ts b/packages/shared/common/src/datasource/index.ts index 237da787e0..497a7588fe 100644 --- a/packages/shared/common/src/datasource/index.ts +++ b/packages/shared/common/src/datasource/index.ts @@ -3,6 +3,7 @@ import { CompositeDataSource } from './CompositeDataSource'; import { DataSourceErrorKind } from './DataSourceErrorKinds'; import { LDFileDataSourceError, + LDFlagDeliveryFallbackError, LDPollingError, LDStreamingError, StreamingErrorHandler, @@ -14,6 +15,7 @@ export { DefaultBackoff, DataSourceErrorKind, LDFileDataSourceError, + LDFlagDeliveryFallbackError, LDPollingError, LDStreamingError, StreamingErrorHandler, diff --git a/packages/shared/common/src/index.ts b/packages/shared/common/src/index.ts index 060ed1bf5a..9f88a96b07 100644 --- a/packages/shared/common/src/index.ts +++ b/packages/shared/common/src/index.ts @@ -7,6 +7,7 @@ import { DataSourceErrorKind, DefaultBackoff, LDFileDataSourceError, + LDFlagDeliveryFallbackError, LDPollingError, LDStreamingError, StreamingErrorHandler, @@ -33,4 +34,5 @@ export { LDStreamingError, StreamingErrorHandler, LDFileDataSourceError, + LDFlagDeliveryFallbackError, }; diff --git a/packages/shared/common/src/internal/fdv2/payloadProcessor.ts b/packages/shared/common/src/internal/fdv2/payloadProcessor.ts index 0273ccacdb..1cd48cac7e 100644 --- a/packages/shared/common/src/internal/fdv2/payloadProcessor.ts +++ b/packages/shared/common/src/internal/fdv2/payloadProcessor.ts @@ -137,7 +137,7 @@ export class PayloadProcessor { // at the time of writing this, it was agreed upon that SDKs could assume exactly 1 element in this list. In the future, a negotiation of protocol version will be required to remove this assumption. const payload = data.payloads[0]; - switch (payload?.code) { + switch (payload?.intentCode) { case 'xfer-full': this._tempBasis = true; break; @@ -150,6 +150,7 @@ export class PayloadProcessor { break; default: // unrecognized intent code, return + this._logger?.warn(`Unable to process intent code '${payload?.intentCode}'.`); return; } diff --git a/packages/shared/common/src/internal/fdv2/proto.ts b/packages/shared/common/src/internal/fdv2/proto.ts index 2500a7e0ff..cb3c04bf08 100644 --- a/packages/shared/common/src/internal/fdv2/proto.ts +++ b/packages/shared/common/src/internal/fdv2/proto.ts @@ -12,7 +12,7 @@ export type IntentCode = 'xfer-full' | 'xfer-changes' | 'none'; export interface PayloadIntent { id: string; target: number; - code: IntentCode; + intentCode: IntentCode; reason: string; } diff --git a/packages/shared/sdk-server/__tests__/data_sources/OneShotInitializerFDv2.test.ts b/packages/shared/sdk-server/__tests__/data_sources/OneShotInitializerFDv2.test.ts index d797a169b1..440912b9de 100644 --- a/packages/shared/sdk-server/__tests__/data_sources/OneShotInitializerFDv2.test.ts +++ b/packages/shared/sdk-server/__tests__/data_sources/OneShotInitializerFDv2.test.ts @@ -1,8 +1,7 @@ -import { DataSourceErrorKind, LDPollingError, subsystem } from '../../src'; +import { subsystem } from '../../src'; import OneShotInitializerFDv2 from '../../src/data_sources/OneShotInitializerFDv2'; -import PollingProcessorFDv2 from '../../src/data_sources/PollingProcessorFDv2'; import Requestor from '../../src/data_sources/Requestor'; -import TestLogger, { LogLevel } from '../Logger'; +import TestLogger from '../Logger'; describe('given a one shot initializer', () => { const requestor = { @@ -12,7 +11,7 @@ describe('given a one shot initializer', () => { events: [ { event: 'server-intent', - data: { payloads: [{ code: 'xfer-full', id: 'mockId' }] }, + data: { payloads: [{ intentCode: 'xfer-full', id: 'mockId' }] }, }, { event: 'put-object', diff --git a/packages/shared/sdk-server/__tests__/data_sources/PollingProcessorFDv2.test.ts b/packages/shared/sdk-server/__tests__/data_sources/PollingProcessorFDv2.test.ts index 3cadd04e28..a1c35d51f3 100644 --- a/packages/shared/sdk-server/__tests__/data_sources/PollingProcessorFDv2.test.ts +++ b/packages/shared/sdk-server/__tests__/data_sources/PollingProcessorFDv2.test.ts @@ -8,11 +8,11 @@ describe('given an event processor', () => { requestAllData: jest.fn(), }; const longInterval = 100000; - const allEvents = { + const allFDv2Events = { events: [ { event: 'server-intent', - data: { payloads: [{ code: 'xfer-full', id: 'mockId' }] }, + data: { payloads: [{ intentCode: 'xfer-full', id: 'mockId' }] }, }, { event: 'put-object', @@ -29,7 +29,13 @@ describe('given an event processor', () => { }, ], }; - const jsonData = JSON.stringify(allEvents); + const fdv2JsonData = JSON.stringify(allFDv2Events); + + const allFDv1Data = { + flags: { flagA: { version: 456 } }, + segments: { segmentA: { version: 789 } }, + }; + const fdv1JsonData = JSON.stringify(allFDv1Data); let processor: PollingProcessorFDv2; const mockDataCallback = jest.fn(); @@ -60,7 +66,7 @@ describe('given an event processor', () => { }); it('calls callback on success', async () => { - requestor.requestAllData = jest.fn((cb) => cb(undefined, jsonData)); + requestor.requestAllData = jest.fn((cb) => cb(undefined, fdv2JsonData)); let dataCallback; await new Promise((resolve) => { dataCallback = jest.fn(() => { @@ -85,8 +91,52 @@ describe('given an event processor', () => { version: 1, }); }); + + it('can process FDv1 data when configured to do so', async () => { + processor = new PollingProcessorFDv2( + requestor as unknown as Requestor, + longInterval, + new TestLogger(), + true, + ); + requestor.requestAllData = jest.fn((cb) => cb(undefined, fdv1JsonData)); + let dataCallback; + await new Promise((resolve) => { + dataCallback = jest.fn(() => { + resolve(); + }); + + processor.start(dataCallback, mockStatusCallback); + }); + + expect(dataCallback).toHaveBeenNthCalledWith(1, true, { + basis: true, + id: `FDv1Fallback`, + state: `FDv1Fallback`, + updates: [ + { + kind: `flag`, + key: `flagA`, + version: 456, + object: { version: 456 }, + }, + { + kind: `segment`, + key: `segmentA`, + version: 789, + object: { version: 789 }, + }, + ], + version: 1, + }); + }); }); +const allFDv1Data = { + flags: { flag: { version: 456 } }, + segments: { segment: { version: 789 } }, +}; + describe('given a polling processor with a short poll duration', () => { const requestor = { requestAllData: jest.fn(), diff --git a/packages/shared/sdk-server/__tests__/data_sources/Requestor.test.ts b/packages/shared/sdk-server/__tests__/data_sources/Requestor.test.ts index 3f3d8537a2..1fc70ef39b 100644 --- a/packages/shared/sdk-server/__tests__/data_sources/Requestor.test.ts +++ b/packages/shared/sdk-server/__tests__/data_sources/Requestor.test.ts @@ -91,7 +91,7 @@ describe('given a requestor', () => { expect(body).toEqual(testResponse); expect(requestsMade.length).toBe(1); - expect(requestsMade[0].url).toBe('https://sdk.launchdarkly.com/sdk/latest-all'); + expect(requestsMade[0].url).toBe('https://sdk.launchdarkly.com/sdk/poll'); expect(requestsMade[0].options.headers?.authorization).toBe('sdkKey'); done(); diff --git a/packages/shared/sdk-server/__tests__/data_sources/StreamingProcessor.test.ts b/packages/shared/sdk-server/__tests__/data_sources/StreamingProcessor.test.ts index f2b7b21aad..0b6b1e6ba6 100644 --- a/packages/shared/sdk-server/__tests__/data_sources/StreamingProcessor.test.ts +++ b/packages/shared/sdk-server/__tests__/data_sources/StreamingProcessor.test.ts @@ -116,7 +116,7 @@ describe('given a stream processor with mock event source', () => { basicConfiguration: getBasicConfiguration(logger), platform: basicPlatform, }, - '/all', + '/sdk/stream', [], listeners, { @@ -139,7 +139,7 @@ describe('given a stream processor with mock event source', () => { it('uses expected uri and eventSource init args', () => { expect(basicPlatform.requests.createEventSource).toBeCalledWith( - `${serviceEndpoints.streaming}/all`, + `${serviceEndpoints.streaming}/sdk/stream`, { errorFilter: expect.any(Function), headers: defaultHeaders(sdkKey, info, undefined), @@ -156,7 +156,7 @@ describe('given a stream processor with mock event source', () => { basicConfiguration: getBasicConfiguration(logger), platform: basicPlatform, }, - '/all', + '/sdk/stream', [], listeners, { @@ -171,7 +171,7 @@ describe('given a stream processor with mock event source', () => { streamingProcessor.start(); expect(basicPlatform.requests.createEventSource).toHaveBeenLastCalledWith( - `${serviceEndpoints.streaming}/all`, + `${serviceEndpoints.streaming}/sdk/stream`, { errorFilter: expect.any(Function), headers: defaultHeaders(sdkKey, info, undefined), diff --git a/packages/shared/sdk-server/__tests__/data_sources/StreamingProcessorFDv2.test.ts b/packages/shared/sdk-server/__tests__/data_sources/StreamingProcessorFDv2.test.ts index 39aa6a02a1..b4fc4ba654 100644 --- a/packages/shared/sdk-server/__tests__/data_sources/StreamingProcessorFDv2.test.ts +++ b/packages/shared/sdk-server/__tests__/data_sources/StreamingProcessorFDv2.test.ts @@ -34,7 +34,7 @@ const dateNowString = '2023-08-10'; const sdkKey = 'my-sdk-key'; const events = { 'server-intent': { - data: '{"payloads": [{"code": "xfer-full", "id": "mockId"}]}', + data: '{"payloads": [{"intentCode": "xfer-full", "id": "mockId"}]}', }, 'put-object': { data: '{"kind": "flag", "key": "flagA", "version": 123, "object": {"objectFieldA": "objectValueA"}}', @@ -108,7 +108,7 @@ describe('given a stream processor with mock event source', () => { basicConfiguration: getBasicConfiguration(logger), platform: basicPlatform, }, - '/all', + '/sdk/stream', [], { authorization: 'my-sdk-key', @@ -132,7 +132,7 @@ describe('given a stream processor with mock event source', () => { it('uses expected uri and eventSource init args', () => { expect(basicPlatform.requests.createEventSource).toBeCalledWith( - `${serviceEndpoints.streaming}/all`, + `${serviceEndpoints.streaming}/sdk/stream`, { errorFilter: expect.any(Function), headers: defaultHeaders(sdkKey, info, undefined), @@ -149,7 +149,7 @@ describe('given a stream processor with mock event source', () => { basicConfiguration: getBasicConfiguration(logger), platform: basicPlatform, }, - '/all', + '/sdk/stream', [], { authorization: 'my-sdk-key', @@ -162,7 +162,7 @@ describe('given a stream processor with mock event source', () => { streamingProcessor.start(jest.fn(), jest.fn()); expect(basicPlatform.requests.createEventSource).toHaveBeenLastCalledWith( - `${serviceEndpoints.streaming}/all`, + `${serviceEndpoints.streaming}/sdk/stream`, { errorFilter: expect.any(Function), headers: defaultHeaders(sdkKey, info, undefined), diff --git a/packages/shared/sdk-server/src/LDClientImpl.ts b/packages/shared/sdk-server/src/LDClientImpl.ts index 25eb64c0f2..78ce113241 100644 --- a/packages/shared/sdk-server/src/LDClientImpl.ts +++ b/packages/shared/sdk-server/src/LDClientImpl.ts @@ -58,7 +58,7 @@ import FlagsStateBuilder from './FlagsStateBuilder'; import HookRunner from './hooks/HookRunner'; import MigrationOpEventToInputEvent from './MigrationOpEventConversion'; import MigrationOpTracker from './MigrationOpTracker'; -import Configuration from './options/Configuration'; +import Configuration, { DEFAULT_POLL_INTERVAL } from './options/Configuration'; import VersionedDataKinds from './store/VersionedDataKinds'; const { ClientMessages, ErrorKinds, NullEventProcessor } = internal; @@ -260,7 +260,7 @@ export default class LDClientImpl implements LDClient { () => new StreamingProcessorFDv2( clientContext, - '/all', + '/sdk/stream', [], baseHeaders, this._diagnosticsManager, @@ -269,12 +269,13 @@ export default class LDClientImpl implements LDClient { ); } + let pollingInterval = DEFAULT_POLL_INTERVAL; // if polling is configured, add polling synchronizer if ( isStandardOptions(config.dataSystem.dataSource) || isPollingOnlyOptions(config.dataSystem.dataSource) ) { - const pollingInterval = config.dataSystem.dataSource.pollInterval; + pollingInterval = config.dataSystem.dataSource.pollInterval ?? DEFAULT_POLL_INTERVAL; synchronizers.push( () => new PollingProcessorFDv2( @@ -285,7 +286,23 @@ export default class LDClientImpl implements LDClient { ); } - this._dataSource = new CompositeDataSource(initializers, synchronizers, this.logger); + // This is short term handling and will be removed once FDv2 adoption is sufficient. + const fdv1FallbackSynchronizers = [ + () => + new PollingProcessorFDv2( + new Requestor(config, this._platform.requests, baseHeaders, "/sdk/latest-all"), + pollingInterval, + config.logger, + true, + ), + ]; + + this._dataSource = new CompositeDataSource( + initializers, + synchronizers, + fdv1FallbackSynchronizers, + this.logger, + ); const payloadListener = createPayloadListener(dataSourceUpdates, this.logger, () => { this._initSuccess(); }); @@ -294,8 +311,8 @@ export default class LDClientImpl implements LDClient { (_, payload) => { payloadListener(payload); }, - (_, err) => { - if (err) { + (state, err) => { + if (state == subsystem.DataSourceState.Closed && err) { this._dataSourceErrorHandler(err); } }, diff --git a/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts b/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts index dc0f9da02f..40f503d4b2 100644 --- a/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts +++ b/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts @@ -3,6 +3,7 @@ import { httpErrorMessage, internal, isHttpRecoverable, + LDFlagDeliveryFallbackError, LDLogger, LDPollingError, subsystem as subsystemCommon, @@ -10,7 +11,7 @@ import { import { Flag } from '../evaluation/data/Flag'; import { Segment } from '../evaluation/data/Segment'; -import { processFlag, processSegment } from '../store/serialization'; +import { FlagsAndSegments, processFlag, processSegment } from '../store/serialization'; import Requestor from './Requestor'; export type PollingErrorHandler = (err: LDPollingError) => void; @@ -24,10 +25,18 @@ export default class PollingProcessorFDv2 implements subsystemCommon.DataSource private _statusCallback?: (status: subsystemCommon.DataSourceState, err?: any) => void; + /** + * @param _requestor to fetch flags from cloud services + * @param _pollInterval in seconds controlling how frequently polling request is made + * @param _logger for logging + * @param _processResponseAsFDv1 defaults to false, but if set to true, this data source will process + * the response body as FDv1 and convert it into a FDv2 payload. + */ constructor( private readonly _requestor: Requestor, private readonly _pollInterval: number = 30, private readonly _logger?: LDLogger, + private readonly _processResponseAsFDv1: boolean = false, ) {} private _poll( @@ -47,6 +56,14 @@ export default class PollingProcessorFDv2 implements subsystemCommon.DataSource this._logger?.debug('Elapsed: %d ms, sleeping for %d ms', elapsed, sleepFor); if (err) { const { status } = err; + // this is a short term error and will be removed once FDv2 adoption is sufficient. + if (err instanceof LDFlagDeliveryFallbackError) { + this._logger?.error(err.message); + statusCallback(subsystemCommon.DataSourceState.Closed, err); + // It is not recoverable, return and do not trigger another poll. + return; + } + if (status && !isHttpRecoverable(status)) { const message = httpErrorMessage(err, 'polling request'); this._logger?.error(message); @@ -87,7 +104,6 @@ export default class PollingProcessorFDv2 implements subsystemCommon.DataSource } try { - const parsed = JSON.parse(body) as internal.FDv2EventsCollection; const payloadProcessor = new internal.PayloadProcessor( { flag: (flag: Flag) => { @@ -112,7 +128,15 @@ export default class PollingProcessorFDv2 implements subsystemCommon.DataSource dataCallback(payload.basis, payload); }); - payloadProcessor.processEvents(parsed.events); + if (!this._processResponseAsFDv1) { + // FDv2 case + const parsed = JSON.parse(body) as internal.FDv2EventsCollection; + payloadProcessor.processEvents(parsed.events); + } else { + // FDv1 case + const parsed = JSON.parse(body) as FlagsAndSegments; + this._processFDv1FlagsAndSegments(payloadProcessor, parsed); + } // TODO: SDK-855 implement blocking duplicate data source state events in DataAvailability API statusCallback(subsystemCommon.DataSourceState.Valid); @@ -134,6 +158,65 @@ export default class PollingProcessorFDv2 implements subsystemCommon.DataSource }); } + // helper function to transform FDv1 response data into events the PayloadProcessor can parse + private _processFDv1FlagsAndSegments( + payloadProcessor: internal.PayloadProcessor, + data: FlagsAndSegments, + ) { + payloadProcessor.processEvents([ + { + event: `server-intent`, + data: { + payloads: [ + { + id: `FDv1Fallback`, + target: 1, + intentCode: `xfer-full`, + }, + ], + }, + }, + ]); + + Object.entries(data?.flags || []).forEach(([key, flag]) => { + payloadProcessor.processEvents([ + { + event: `put-object`, + data: { + kind: 'flag', + key, + version: flag.version, + object: flag, + }, + }, + ]); + }); + + Object.entries(data?.segments || []).forEach(([key, segment]) => { + payloadProcessor.processEvents([ + { + event: `put-object`, + data: { + kind: 'segment', + key, + version: segment.version, + object: segment, + }, + }, + ]); + }); + + payloadProcessor.processEvents([ + { + event: `payload-transferred`, + data: { + state: `FDv1Fallback`, + version: 1, + }, + }, + ]); + } + start( dataCallback: (basis: boolean, data: any) => void, statusCallback: (status: subsystemCommon.DataSourceState, err?: any) => void, diff --git a/packages/shared/sdk-server/src/data_sources/Requestor.ts b/packages/shared/sdk-server/src/data_sources/Requestor.ts index 0d3567eae8..4eac943c80 100644 --- a/packages/shared/sdk-server/src/data_sources/Requestor.ts +++ b/packages/shared/sdk-server/src/data_sources/Requestor.ts @@ -1,6 +1,7 @@ import { DataSourceErrorKind, getPollingUri, + LDFlagDeliveryFallbackError, LDHeaders, LDPollingError, Options, @@ -31,9 +32,10 @@ export default class Requestor implements LDFeatureRequestor { config: Configuration, private readonly _requests: Requests, baseHeaders: LDHeaders, + path: string = '/sdk/poll', ) { this._headers = { ...baseHeaders }; - this._uri = getPollingUri(config.serviceEndpoints, '/sdk/latest-all', []); + this._uri = getPollingUri(config.serviceEndpoints, path, []); } /** @@ -77,6 +79,15 @@ export default class Requestor implements LDFeatureRequestor { }; try { const { res, body } = await this._requestWithETagCache(this._uri, options); + if (res.headers.get(`x-ld-fd-fallback`) === `true`) { + const err = new LDFlagDeliveryFallbackError( + DataSourceErrorKind.ErrorResponse, + `Response header indicates to fallback to FDv1.`, + res.status, + ); + return cb(err, undefined); + } + if (res.status !== 200 && res.status !== 304) { const err = new LDPollingError( DataSourceErrorKind.ErrorResponse, diff --git a/packages/shared/sdk-server/src/data_sources/StreamingProcessorFDv2.ts b/packages/shared/sdk-server/src/data_sources/StreamingProcessorFDv2.ts index 87c609acd6..4243a893bf 100644 --- a/packages/shared/sdk-server/src/data_sources/StreamingProcessorFDv2.ts +++ b/packages/shared/sdk-server/src/data_sources/StreamingProcessorFDv2.ts @@ -6,6 +6,7 @@ import { httpErrorMessage, HttpErrorResponse, internal, + LDFlagDeliveryFallbackError, LDHeaders, LDLogger, LDStreamingError, @@ -78,6 +79,17 @@ export default class StreamingProcessorFDv2 implements subsystemCommon.DataSourc err: HttpErrorResponse, statusCallback: (status: subsystemCommon.DataSourceState, err?: any) => void, ) { + // this is a short term error and will be removed once FDv2 adoption is sufficient. + if (err.headers?.[`x-ld-fd-fallback`] === `true`) { + const fallbackErr = new LDFlagDeliveryFallbackError( + DataSourceErrorKind.ErrorResponse, + `Response header indicates to fallback to FDv1`, + err.status, + ); + statusCallback(subsystemCommon.DataSourceState.Closed, fallbackErr); + return false; + } + if (!shouldRetry(err)) { this._logger?.error(httpErrorMessage(err, 'streaming request')); this._logConnectionResult(false); diff --git a/packages/shared/sdk-server/src/options/Configuration.ts b/packages/shared/sdk-server/src/options/Configuration.ts index bfc3d79cd0..6cdb9bdcf7 100644 --- a/packages/shared/sdk-server/src/options/Configuration.ts +++ b/packages/shared/sdk-server/src/options/Configuration.ts @@ -71,7 +71,7 @@ const validations: Record = { type: TypeValidators.String, }; -const DEFAULT_POLL_INTERVAL = 30; +export const DEFAULT_POLL_INTERVAL = 30; const DEFAULT_STREAM_RECONNECT_DELAY = 1; const defaultStandardDataSourceOptions: StandardDataSourceOptions = {