From c5ef0d447aad794a45fe226d81216a779dbe5aa5 Mon Sep 17 00:00:00 2001 From: Todd Anderson Date: Thu, 17 Apr 2025 17:33:31 -0500 Subject: [PATCH 1/4] feat: CompositeDataSource blacklists DataSources that report Off --- .../datasource/dataSourceList.test.ts | 127 +++++++ .../DataSystem/CompositeDataSource.test.ts | 353 ++++++++++++++---- .../api/subsystem/DataSystem/DataSource.ts | 29 +- .../src/api/subsystem/DataSystem/index.ts | 9 +- .../shared/common/src/api/subsystem/index.ts | 14 +- .../src/datasource/CompositeDataSource.ts | 184 ++++++--- .../common/src/datasource/dataSourceList.ts | 102 +++++ .../OneShotInitializerFDv2.test.ts | 5 +- .../shared/sdk-server/src/LDClientImpl.ts | 4 +- .../data_sources/OneShotInitializerFDv2.ts | 9 +- .../src/data_sources/PollingProcessorFDv2.ts | 13 +- .../data_sources/StreamingProcessorFDv2.ts | 4 +- 12 files changed, 681 insertions(+), 172 deletions(-) create mode 100644 packages/shared/common/__tests__/datasource/dataSourceList.test.ts create mode 100644 packages/shared/common/src/datasource/dataSourceList.ts diff --git a/packages/shared/common/__tests__/datasource/dataSourceList.test.ts b/packages/shared/common/__tests__/datasource/dataSourceList.test.ts new file mode 100644 index 0000000000..398eb7b503 --- /dev/null +++ b/packages/shared/common/__tests__/datasource/dataSourceList.test.ts @@ -0,0 +1,127 @@ +import { DataSourceList } from '../../src/datasource/dataSourceList'; + +it('replace is well behaved', async () => { + const underTest = new DataSourceList(true, [1, 2, 3]); + expect(underTest.next()).toEqual(1); + expect(underTest.next()).toEqual(2); + underTest.replace([4, 5, 6]); + expect(underTest.next()).toEqual(4); + expect(underTest.next()).toEqual(5); + expect(underTest.next()).toEqual(6); + expect(underTest.next()).toEqual(4); + expect(underTest.next()).toEqual(5); +}); + +it('it cycles correctly after replacing non-empty list', async () => { + const underTest = new DataSourceList(true, [1, 2, 3]); + expect(underTest.next()).toEqual(1); + expect(underTest.next()).toEqual(2); + expect(underTest.next()).toEqual(3); + + underTest.remove(1); + expect(underTest.next()).toEqual(2); + expect(underTest.next()).toEqual(3); + expect(underTest.next()).toEqual(2); + expect(underTest.next()).toEqual(3); + + underTest.remove(3); + expect(underTest.next()).toEqual(2); + expect(underTest.next()).toEqual(2); + + underTest.remove(2); + expect(underTest.next()).toBeUndefined(); + expect(underTest.next()).toBeUndefined(); + expect(underTest.next()).toBeUndefined(); + + underTest.replace([4, 5, 6]); + + expect(underTest.next()).toEqual(4); + expect(underTest.next()).toEqual(5); + expect(underTest.next()).toEqual(6); + + underTest.remove(4); + expect(underTest.next()).toEqual(5); + expect(underTest.next()).toEqual(6); + expect(underTest.next()).toEqual(5); + expect(underTest.next()).toEqual(6); + + underTest.remove(6); + expect(underTest.next()).toEqual(5); + expect(underTest.next()).toEqual(5); + + underTest.remove(5); + expect(underTest.next()).toBeUndefined(); + expect(underTest.next()).toBeUndefined(); +}); + +it('cycles correctly after replacing empty list', async () => { + const underTest = new DataSourceList(true, []); + + underTest.replace([1, 2, 3]); + + expect(underTest.next()).toEqual(1); + expect(underTest.next()).toEqual(2); + expect(underTest.next()).toEqual(3); + + underTest.remove(1); + expect(underTest.next()).toEqual(2); + expect(underTest.next()).toEqual(3); + expect(underTest.next()).toEqual(2); + expect(underTest.next()).toEqual(3); + + underTest.remove(3); + expect(underTest.next()).toEqual(2); + expect(underTest.next()).toEqual(2); + + underTest.remove(2); + expect(underTest.next()).toBeUndefined(); + expect(underTest.next()).toBeUndefined(); +}); + +it('removing head is well behaved at start', async () => { + const underTest = new DataSourceList(true, [1, 2, 3]); + // head is now pointing to 1 + underTest.remove(1); + expect(underTest.next()).toEqual(2); + expect(underTest.next()).toEqual(3); + expect(underTest.next()).toEqual(2); +}); + +it('removing head is well behaved in middle', async () => { + const underTest = new DataSourceList(true, [1, 2, 3]); + expect(underTest.next()).toEqual(1); + // head is now pointing to 2 + underTest.remove(2); + expect(underTest.next()).toEqual(3); + expect(underTest.next()).toEqual(1); + expect(underTest.next()).toEqual(3); +}); + +it('removing head is well behaved at end', async () => { + const underTest = new DataSourceList(true, [1, 2, 3]); + expect(underTest.next()).toEqual(1); + expect(underTest.next()).toEqual(2); + // head is now pointing to 3 + underTest.remove(3); + expect(underTest.next()).toEqual(1); + expect(underTest.next()).toEqual(2); + expect(underTest.next()).toEqual(1); +}); + +it('removing existing returns true', async () => { + const underTest = new DataSourceList(true, [1]); + expect(underTest.remove(1)).toEqual(true); + expect(underTest.next()).toBeUndefined(); +}); + +it('removing nonexistent returns false', async () => { + const underTest = new DataSourceList(true, []); + expect(underTest.remove(1)).toEqual(false); + expect(underTest.next()).toBeUndefined(); +}); + +it('single element removed and next called', async () => { + const underTest = new DataSourceList(true, [1]); + expect(underTest.remove(1)).toEqual(true); + expect(underTest.next()).toBeUndefined(); +}); diff --git a/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts b/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts index a83025808f..320f18304b 100644 --- a/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts +++ b/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts @@ -1,9 +1,7 @@ import { + DataSource, DataSourceState, - DataSystemInitializer, - DataSystemSynchronizer, - LDInitializerFactory, - LDSynchronizerFactory, + LDDataSourceFactory, } from '../../../src/api/subsystem/DataSystem/DataSource'; import { Backoff } from '../../../src/datasource/Backoff'; import { @@ -11,31 +9,19 @@ import { TransitionConditions, } from '../../../src/datasource/CompositeDataSource'; -function makeInitializerFactory(internal: DataSystemInitializer): LDInitializerFactory { - return () => internal; -} - -function makeSynchronizerFactory(internal: DataSystemSynchronizer): LDSynchronizerFactory { +function makeDataSourceFactory(internal: DataSource): LDDataSourceFactory { return () => internal; } function makeTestTransitionConditions(): TransitionConditions { return { - [DataSourceState.Initializing]: { - durationMS: 0, - transition: 'fallback', - }, [DataSourceState.Interrupted]: { durationMS: 0, transition: 'fallback', }, - [DataSourceState.Closed]: { - durationMS: 0, - transition: 'fallback', - }, [DataSourceState.Valid]: { durationMS: 0, - transition: 'fallback', + transition: 'recover', }, }; } @@ -51,7 +37,9 @@ function makeZeroBackoff(): Backoff { }; } -it('handles initializer getting basis, switching to syncrhonizer', async () => { +// TODO: go through tests and tune status reporting to verify composite data source is correctly coalescing/masking status during transitions. + +it('handles initializer getting basis, switching to synchronizer', async () => { const mockInitializer1 = { start: jest .fn() @@ -60,7 +48,10 @@ it('handles initializer getting basis, switching to syncrhonizer', async () => { _dataCallback: (basis: boolean, data: any) => void, _statusCallback: (status: DataSourceState, err?: any) => void, ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Valid); _dataCallback(true, { key: 'init1' }); + _statusCallback(DataSourceState.Closed); }, ), stop: jest.fn(), @@ -75,6 +66,8 @@ it('handles initializer getting basis, switching to syncrhonizer', async () => { _dataCallback: (basis: boolean, data: any) => void, _statusCallback: (status: DataSourceState, err?: any) => void, ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Valid); _dataCallback(false, mockSynchronizer1Data); }, ), @@ -82,33 +75,39 @@ it('handles initializer getting basis, switching to syncrhonizer', async () => { }; const underTest = new CompositeDataSource( - [makeInitializerFactory(mockInitializer1)], - [makeSynchronizerFactory(mockSynchronizer1)], + [makeDataSourceFactory(mockInitializer1)], + [makeDataSourceFactory(mockSynchronizer1)], undefined, makeTestTransitionConditions(), makeZeroBackoff(), ); - let callback; + let dataCallback; + const statusCallback = jest.fn(); await new Promise((resolve) => { - callback = jest.fn((_: boolean, data: any) => { + dataCallback = jest.fn((_: boolean, data: any) => { if (data === mockSynchronizer1Data) { resolve(); } }); - underTest.start(callback, jest.fn()); + underTest.start(dataCallback, statusCallback); }); expect(mockInitializer1.start).toHaveBeenCalledTimes(1); expect(mockSynchronizer1.start).toHaveBeenCalledTimes(1); - expect(callback).toHaveBeenCalledTimes(2); - expect(callback).toHaveBeenNthCalledWith(1, true, { key: 'init1' }); - expect(callback).toHaveBeenNthCalledWith(2, false, { key: 'sync1' }); + expect(dataCallback).toHaveBeenCalledTimes(2); + expect(dataCallback).toHaveBeenNthCalledWith(1, true, { key: 'init1' }); + expect(dataCallback).toHaveBeenNthCalledWith(2, false, { key: 'sync1' }); + expect(statusCallback).toHaveBeenCalledTimes(4); + expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Initializing, undefined); + expect(statusCallback).toHaveBeenNthCalledWith(2, DataSourceState.Valid, undefined); + expect(statusCallback).toHaveBeenNthCalledWith(3, DataSourceState.Interrupted, undefined); + expect(statusCallback).toHaveBeenNthCalledWith(4, DataSourceState.Valid, undefined); }); it('handles initializer getting basis, switches to synchronizer 1, falls back to synchronizer 2, recovers to synchronizer 1', async () => { - const mockInitializer1: DataSystemInitializer = { + const mockInitializer1: DataSource = { start: jest .fn() .mockImplementation( @@ -116,7 +115,10 @@ it('handles initializer getting basis, switches to synchronizer 1, falls back to _dataCallback: (basis: boolean, data: any) => void, _statusCallback: (status: DataSourceState, err?: any) => void, ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Valid); _dataCallback(true, { key: 'init1' }); + _statusCallback(DataSourceState.Closed); }, ), stop: jest.fn(), @@ -133,11 +135,14 @@ it('handles initializer getting basis, switches to synchronizer 1, falls back to _statusCallback: (status: DataSourceState, err?: any) => void, ) => { if (sync1RunCount === 0) { + _statusCallback(DataSourceState.Initializing); _statusCallback(DataSourceState.Closed, { name: 'Error', message: 'I am error...man!', }); // error that will lead to fallback } else { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Valid); _dataCallback(false, mockSynchronizer1Data); // second start will lead to data } sync1RunCount += 1; @@ -155,22 +160,24 @@ it('handles initializer getting basis, switches to synchronizer 1, falls back to _dataCallback: (basis: boolean, data: any) => void, _statusCallback: (status: DataSourceState, err?: any) => void, ) => { - _dataCallback(false, mockSynchronizer2Data); + _statusCallback(DataSourceState.Initializing); _statusCallback(DataSourceState.Valid, null); // this should lead to recovery + _dataCallback(false, mockSynchronizer2Data); }, ), stop: jest.fn(), }; const underTest = new CompositeDataSource( - [makeInitializerFactory(mockInitializer1)], - [makeSynchronizerFactory(mockSynchronizer1), makeSynchronizerFactory(mockSynchronizer2)], + [makeDataSourceFactory(mockInitializer1)], + [makeDataSourceFactory(mockSynchronizer1), makeDataSourceFactory(mockSynchronizer2)], undefined, makeTestTransitionConditions(), makeZeroBackoff(), ); let callback; + const statusCallback = jest.fn(); await new Promise((resolve) => { callback = jest.fn((_: boolean, data: any) => { if (data === mockSynchronizer1Data) { @@ -178,7 +185,7 @@ it('handles initializer getting basis, switches to synchronizer 1, falls back to } }); - underTest.start(callback, jest.fn()); + underTest.start(callback, statusCallback); }); expect(mockInitializer1.start).toHaveBeenCalledTimes(1); @@ -188,6 +195,116 @@ it('handles initializer getting basis, switches to synchronizer 1, falls back to expect(callback).toHaveBeenNthCalledWith(1, true, { key: 'init1' }); expect(callback).toHaveBeenNthCalledWith(2, false, { key: 'sync2' }); // sync1 errors and fallsback expect(callback).toHaveBeenNthCalledWith(3, false, { key: 'sync1' }); // sync2 recovers back to sync1 + expect(statusCallback).toHaveBeenCalledTimes(7); + expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Initializing, undefined); + expect(statusCallback).toHaveBeenNthCalledWith(2, DataSourceState.Valid, undefined); // initializer got data + expect(statusCallback).toHaveBeenNthCalledWith(3, DataSourceState.Interrupted, undefined); // initializer closed + expect(statusCallback).toHaveBeenNthCalledWith(4, DataSourceState.Interrupted, expect.anything()); // sync1 error + expect(statusCallback).toHaveBeenNthCalledWith(5, DataSourceState.Valid, undefined); // sync2 got data + expect(statusCallback).toHaveBeenNthCalledWith(6, DataSourceState.Interrupted, undefined); // recover to sync1 + expect(statusCallback).toHaveBeenNthCalledWith(7, DataSourceState.Valid, undefined); // sync1 valid +}); + +it('removes synchronizer that reports unrecoverable error and loops on remaining synchronizer', async () => { + const mockInitializer1: DataSource = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Valid); + _dataCallback(true, { key: 'init1' }); + _statusCallback(DataSourceState.Closed); + }, + ), + stop: jest.fn(), + }; + + const mockSynchronizer1 = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Off, { + name: 'Error', + message: 'I am an unrecoverable error!', // error will lead to culling + }); + }, + ), + stop: jest.fn(), + }; + + let sync2RunCount = 0; + const mockSynchronizer2Data = { key: 'sync2' }; + const mockSynchronizer2 = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + if (sync2RunCount < 5) { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Closed, { + name: 'Error', + message: `I am a recoverable error ${sync2RunCount}`, + }); // error that will lead to fallback + } else { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Valid); + _dataCallback(false, mockSynchronizer2Data); // second start will lead to data + } + sync2RunCount += 1; + }, + ), + stop: jest.fn(), + }; + + const underTest = new CompositeDataSource( + [makeDataSourceFactory(mockInitializer1)], + [makeDataSourceFactory(mockSynchronizer1), makeDataSourceFactory(mockSynchronizer2)], + undefined, + makeTestTransitionConditions(), + makeZeroBackoff(), + ); + + let dataCallback; + const statusCallback = jest.fn(); + await new Promise((resolve) => { + dataCallback = jest.fn((_: boolean, data: any) => { + if (data === mockSynchronizer2Data) { + resolve(); + } + }); + + underTest.start(dataCallback, statusCallback); + }); + + expect(mockInitializer1.start).toHaveBeenCalledTimes(1); + expect(mockSynchronizer1.start).toHaveBeenCalledTimes(1); // only called the first time + expect(mockSynchronizer2.start).toHaveBeenCalledTimes(6); // called 5 times with recoverable errors then 6th time succeeds + expect(dataCallback).toHaveBeenCalledTimes(2); + expect(dataCallback).toHaveBeenNthCalledWith(1, true, { key: 'init1' }); + expect(dataCallback).toHaveBeenNthCalledWith(2, false, { key: 'sync2' }); + expect(statusCallback).toHaveBeenCalledTimes(10); + expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Initializing, undefined); + expect(statusCallback).toHaveBeenNthCalledWith(2, DataSourceState.Valid, undefined); // initializer got data + expect(statusCallback).toHaveBeenNthCalledWith(3, DataSourceState.Interrupted, undefined); // initializer closed + expect(statusCallback).toHaveBeenNthCalledWith(4, DataSourceState.Interrupted, expect.anything()); // sync1 unrecoverable error + expect(statusCallback).toHaveBeenNthCalledWith(5, DataSourceState.Interrupted, expect.anything()); // sync2 recoverable error + expect(statusCallback).toHaveBeenNthCalledWith(6, DataSourceState.Interrupted, expect.anything()); // sync2 recoverable error + expect(statusCallback).toHaveBeenNthCalledWith(7, DataSourceState.Interrupted, expect.anything()); // sync2 recoverable error + expect(statusCallback).toHaveBeenNthCalledWith(8, DataSourceState.Interrupted, expect.anything()); // sync2 recoverable error + expect(statusCallback).toHaveBeenNthCalledWith(9, DataSourceState.Interrupted, expect.anything()); // sync2 recoverable error + expect(statusCallback).toHaveBeenNthCalledWith(10, DataSourceState.Valid, undefined); // sync1 valid }); it('reports error when all initializers fail', async () => { @@ -195,7 +312,7 @@ it('reports error when all initializers fail', async () => { name: 'Error', message: 'I am initializer1 error!', }; - const mockInitializer1: DataSystemInitializer = { + const mockInitializer1: DataSource = { start: jest .fn() .mockImplementation( @@ -203,6 +320,7 @@ it('reports error when all initializers fail', async () => { _dataCallback: (basis: boolean, data: any) => void, _statusCallback: (status: DataSourceState, err?: any) => void, ) => { + _statusCallback(DataSourceState.Initializing); _statusCallback(DataSourceState.Closed, mockInitializer1Error); }, ), @@ -213,7 +331,7 @@ it('reports error when all initializers fail', async () => { name: 'Error', message: 'I am initializer2 error!', }; - const mockInitializer2: DataSystemInitializer = { + const mockInitializer2: DataSource = { start: jest .fn() .mockImplementation( @@ -221,6 +339,7 @@ it('reports error when all initializers fail', async () => { _dataCallback: (basis: boolean, data: any) => void, _statusCallback: (status: DataSourceState, err?: any) => void, ) => { + _statusCallback(DataSourceState.Initializing); _statusCallback(DataSourceState.Closed, mockInitializer2Error); }, ), @@ -228,7 +347,7 @@ it('reports error when all initializers fail', async () => { }; const underTest = new CompositeDataSource( - [makeInitializerFactory(mockInitializer1), makeInitializerFactory(mockInitializer2)], + [makeDataSourceFactory(mockInitializer1), makeDataSourceFactory(mockInitializer2)], [], // no synchronizers for this test undefined, makeTestTransitionConditions(), @@ -250,24 +369,107 @@ it('reports error when all initializers fail', async () => { expect(mockInitializer1.start).toHaveBeenCalledTimes(1); expect(mockInitializer2.start).toHaveBeenCalledTimes(1); expect(dataCallback).toHaveBeenCalledTimes(0); + expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Initializing, undefined); expect(statusCallback).toHaveBeenNthCalledWith( - 1, + 2, DataSourceState.Interrupted, mockInitializer1Error, ); expect(statusCallback).toHaveBeenNthCalledWith( - 2, + 3, DataSourceState.Interrupted, mockInitializer2Error, ); - expect(statusCallback).toHaveBeenNthCalledWith(3, DataSourceState.Closed, { + expect(statusCallback).toHaveBeenNthCalledWith(4, DataSourceState.Closed, { name: 'ExhaustedDataSources', message: 'CompositeDataSource has exhausted all configured datasources (2 initializers, 0 synchronizers).', }); - expect(statusCallback).toHaveBeenCalledTimes(3); + expect(statusCallback).toHaveBeenCalledTimes(4); }); +// it('it reports DataSourceState Off when all synchronizers report Off', async () => { +// const mockInitializer1: DataSource = { +// start: jest +// .fn() +// .mockImplementation( +// ( +// _dataCallback: (basis: boolean, data: any) => void, +// _statusCallback: (status: DataSourceState, err?: any) => void, +// ) => { +// _statusCallback(DataSourceState.Initializing); +// _statusCallback(DataSourceState.Valid); +// _dataCallback(true, { key: 'init1' }); +// _statusCallback(DataSourceState.Closed); +// }, +// ), +// stop: jest.fn(), +// }; + +// const mockSynchronizer1 = { +// start: jest +// .fn() +// .mockImplementation( +// ( +// _dataCallback: (basis: boolean, data: any) => void, +// _statusCallback: (status: DataSourceState, err?: any) => void, +// ) => { +// _statusCallback(DataSourceState.Initializing); +// _statusCallback(DataSourceState.Off, { +// name: 'Error1', +// message: 'I am an unrecoverable error!', +// }); +// }, +// ), +// stop: jest.fn(), +// }; + +// const mockSynchronizer2 = { +// start: jest +// .fn() +// .mockImplementation( +// ( +// _dataCallback: (basis: boolean, data: any) => void, +// _statusCallback: (status: DataSourceState, err?: any) => void, +// ) => { +// _statusCallback(DataSourceState.Initializing); +// _statusCallback(DataSourceState.Off, { +// name: 'Error2', +// message: 'I am an unrecoverable error!', +// }); +// }, +// ), +// stop: jest.fn(), +// }; + +// const underTest = new CompositeDataSource( +// [makeDataSourceFactory(mockInitializer1)], +// [makeDataSourceFactory(mockSynchronizer1), makeDataSourceFactory(mockSynchronizer2)], +// undefined, +// makeTestTransitionConditions(), +// makeZeroBackoff(), +// ); + +// let statusCallback; +// await new Promise((resolve) => { +// statusCallback = jest.fn((state: DataSourceState, err: any) => { +// if (err && err.name === 'Error2') { +// resolve(); +// } +// }); + +// underTest.start(jest.fn(), statusCallback); +// }); + +// expect(mockInitializer1.start).toHaveBeenCalledTimes(1); +// expect(mockSynchronizer1.start).toHaveBeenCalledTimes(1); +// expect(mockSynchronizer2.start).toHaveBeenCalledTimes(1); +// expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Initializing, null); +// expect(statusCallback).toHaveBeenNthCalledWith(2, DataSourceState.Valid, null); +// expect(statusCallback).toHaveBeenNthCalledWith(3, DataSourceState.Interrupted, expect.anything()); +// expect(statusCallback).toHaveBeenNthCalledWith(4, DataSourceState.Off, null); +// }); + it('can be stopped when in thrashing synchronizer fallback loop', async () => { const mockInitializer1 = { start: jest @@ -299,8 +501,8 @@ it('can be stopped when in thrashing synchronizer fallback loop', async () => { }; const underTest = new CompositeDataSource( - [makeInitializerFactory(mockInitializer1)], - [makeSynchronizerFactory(mockSynchronizer1)], // will continuously fallback onto itself + [makeDataSourceFactory(mockInitializer1)], + [makeDataSourceFactory(mockSynchronizer1)], // will continuously fallback onto itself undefined, makeTestTransitionConditions(), makeZeroBackoff(), @@ -309,8 +511,8 @@ it('can be stopped when in thrashing synchronizer fallback loop', async () => { const dataCallback = jest.fn(); let statusCallback; await new Promise((resolve) => { - statusCallback = jest.fn((state: DataSourceState, _: any) => { - if (state === DataSourceState.Interrupted) { + statusCallback = jest.fn((state: DataSourceState, err: any) => { + if (state === DataSourceState.Interrupted && err === mockSynchronizer1Error) { resolve(); // waiting interruption due to sync error } }); @@ -328,12 +530,14 @@ it('can be stopped when in thrashing synchronizer fallback loop', async () => { setTimeout(f, 100); }); + expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Initializing, undefined); // initializer + expect(statusCallback).toHaveBeenNthCalledWith(2, DataSourceState.Interrupted, undefined); // initializer closes expect(statusCallback).toHaveBeenNthCalledWith( - 1, + 3, DataSourceState.Interrupted, mockSynchronizer1Error, - ); - expect(statusCallback).toHaveBeenNthCalledWith(2, DataSourceState.Closed, undefined); + ); // synchronizer error + expect(statusCallback).toHaveBeenNthCalledWith(4, DataSourceState.Closed, undefined); // stop composite source }); it('can be stopped and restarted', async () => { @@ -368,8 +572,8 @@ it('can be stopped and restarted', async () => { }; const underTest = new CompositeDataSource( - [makeInitializerFactory(mockInitializer1)], - [makeSynchronizerFactory(mockSynchronizer1)], + [makeDataSourceFactory(mockInitializer1)], + [makeDataSourceFactory(mockSynchronizer1)], undefined, makeTestTransitionConditions(), makeZeroBackoff(), @@ -425,14 +629,17 @@ it('is well behaved with no initializers and no synchronizers configured', async let statusCallback; await new Promise((resolve) => { - statusCallback = jest.fn((_1: DataSourceState, _2: any) => { - resolve(); + statusCallback = jest.fn((state: DataSourceState, _2: any) => { + if (state === DataSourceState.Closed) { + resolve(); + } }); underTest.start(jest.fn(), statusCallback); }); - expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Closed, { + expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Initializing, undefined); // initializer + expect(statusCallback).toHaveBeenNthCalledWith(2, DataSourceState.Closed, { name: 'ExhaustedDataSources', message: 'CompositeDataSource has exhausted all configured datasources (0 initializers, 0 synchronizers).', @@ -449,6 +656,8 @@ it('is well behaved with no initializer and synchronizer configured', async () = _dataCallback: (basis: boolean, data: any) => void, _statusCallback: (status: DataSourceState, err?: any) => void, ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Valid); _dataCallback(false, mockSynchronizer1Data); }, ), @@ -457,22 +666,25 @@ it('is well behaved with no initializer and synchronizer configured', async () = const underTest = new CompositeDataSource( [], - [makeSynchronizerFactory(mockSynchronizer1)], + [makeDataSourceFactory(mockSynchronizer1)], undefined, makeTestTransitionConditions(), makeZeroBackoff(), ); let dataCallback; + const statusCallback = jest.fn(); await new Promise((resolve) => { dataCallback = jest.fn(() => { resolve(); }); - underTest.start(dataCallback, jest.fn()); + underTest.start(dataCallback, statusCallback); }); expect(dataCallback).toHaveBeenNthCalledWith(1, false, { key: 'sync1' }); + expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Initializing, undefined); + expect(statusCallback).toHaveBeenNthCalledWith(2, DataSourceState.Valid, undefined); }); it('is well behaved with an initializer and no synchronizers configured', async () => { @@ -484,14 +696,17 @@ it('is well behaved with an initializer and no synchronizers configured', async _dataCallback: (basis: boolean, data: any) => void, _statusCallback: (status: DataSourceState, err?: any) => void, ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Valid); _dataCallback(true, { key: 'init1' }); + _statusCallback(DataSourceState.Closed); }, ), stop: jest.fn(), }; const underTest = new CompositeDataSource( - [makeInitializerFactory(mockInitializer1)], + [makeDataSourceFactory(mockInitializer1)], [], undefined, makeTestTransitionConditions(), @@ -502,15 +717,20 @@ it('is well behaved with an initializer and no synchronizers configured', async let statusCallback; await new Promise((resolve) => { dataCallback = jest.fn(); - statusCallback = jest.fn((_1: DataSourceState, _2: any) => { - resolve(); + statusCallback = jest.fn((state: DataSourceState, _2: any) => { + if (state === DataSourceState.Closed) { + resolve(); + } }); underTest.start(dataCallback, statusCallback); }); expect(dataCallback).toHaveBeenNthCalledWith(1, true, { key: 'init1' }); - expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Closed, { + expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Initializing, undefined); + expect(statusCallback).toHaveBeenNthCalledWith(2, DataSourceState.Valid, undefined); // initializer got data + expect(statusCallback).toHaveBeenNthCalledWith(3, DataSourceState.Interrupted, undefined); // initializer got data + expect(statusCallback).toHaveBeenNthCalledWith(4, DataSourceState.Closed, { name: 'ExhaustedDataSources', message: 'CompositeDataSource has exhausted all configured datasources (1 initializers, 0 synchronizers).', @@ -541,6 +761,7 @@ it('consumes cancellation tokens correctly', async () => { _statusCallback: (status: DataSourceState, err?: any) => void, ) => { _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Valid); _statusCallback(DataSourceState.Interrupted); // report interrupted to schedule automatic transition and create cancellation token }, ), @@ -548,11 +769,11 @@ it('consumes cancellation tokens correctly', async () => { }; const underTest = new CompositeDataSource( - [makeInitializerFactory(mockInitializer1)], - [makeSynchronizerFactory(mockSynchronizer1)], + [makeDataSourceFactory(mockInitializer1)], + [makeDataSourceFactory(mockSynchronizer1)], undefined, { - // pass in transition condition of 0 so that it will thrash, generating cancellation tokens repeatedly + // pass in transition condition so that it will thrash, generating cancellation tokens repeatedly [DataSourceState.Interrupted]: { durationMS: 100, transition: 'fallback', @@ -566,11 +787,13 @@ it('consumes cancellation tokens correctly', async () => { let interruptedCount = 0; await new Promise((resolve) => { dataCallback = jest.fn(); - statusCallback = jest.fn((_1: DataSourceState, _2: any) => { - interruptedCount += 1; - if (interruptedCount > 10) { - // let it thrash for N iterations - resolve(); + statusCallback = jest.fn((state: DataSourceState, _2: any) => { + if (state === DataSourceState.Interrupted) { + interruptedCount += 1; + if (interruptedCount > 10) { + // let it thrash for N iterations + resolve(); + } } }); diff --git a/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts b/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts index f7a5834761..fd9df9cc1c 100644 --- a/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts +++ b/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts @@ -1,15 +1,24 @@ // TODO: refactor client-sdk to use this enum export enum DataSourceState { - // Spinning up to make first connection attempt - Initializing, // Positive confirmation of connection/data receipt Valid, + // Spinning up to make first connection attempt + Initializing, // Transient issue, automatic retry is expected Interrupted, - // Permanent issue, external intervention required + // Data source was closed and will not retry Closed, + // This datasource encountered an unrecoverable error and it is not expected to be resolved through trying again in the future + Off, } +// Matthew: include some designation with the Off status that indicates we should fall back to FDv1. +// Expand existing FDv1 polling source and add translation layer in that implementation. +// If FDv1 is also failing, then data system terminates/closes like it would if all FDv2 sources were failing. +// If any FDv2 source indicates to fall back to FDv1, drop all FDv2 sources. + +// Resume here + export interface DataSource { /** * May be called any number of times, if already started, has no effect @@ -28,16 +37,4 @@ export interface DataSource { stop(): void; } -export type LDInitializerFactory = () => DataSystemInitializer; - -export type LDSynchronizerFactory = () => DataSystemSynchronizer; - -/** - * A data source that can be used to fetch the basis. - */ -export interface DataSystemInitializer extends DataSource {} - -/** - * A data source that can be used to fetch the basis or ongoing data changes. - */ -export interface DataSystemSynchronizer extends DataSource {} +export type LDDataSourceFactory = () => DataSource; diff --git a/packages/shared/common/src/api/subsystem/DataSystem/index.ts b/packages/shared/common/src/api/subsystem/DataSystem/index.ts index 3ad2737e2d..0b9e1141b8 100644 --- a/packages/shared/common/src/api/subsystem/DataSystem/index.ts +++ b/packages/shared/common/src/api/subsystem/DataSystem/index.ts @@ -1,8 +1 @@ -export { - DataSource, - DataSourceState, - DataSystemInitializer, - DataSystemSynchronizer, - LDInitializerFactory, - LDSynchronizerFactory, -} from './DataSource'; +export { DataSource, DataSourceState, LDDataSourceFactory } from './DataSource'; diff --git a/packages/shared/common/src/api/subsystem/index.ts b/packages/shared/common/src/api/subsystem/index.ts index 7dc2af969b..bace160f64 100644 --- a/packages/shared/common/src/api/subsystem/index.ts +++ b/packages/shared/common/src/api/subsystem/index.ts @@ -1,11 +1,4 @@ -import { - DataSource, - DataSourceState, - DataSystemInitializer, - DataSystemSynchronizer, - LDInitializerFactory, - LDSynchronizerFactory, -} from './DataSystem'; +import { DataSource, DataSourceState, LDDataSourceFactory } from './DataSystem'; import LDContextDeduplicator from './LDContextDeduplicator'; import LDEventProcessor from './LDEventProcessor'; import LDEventSender, { LDDeliveryStatus, LDEventSenderResult, LDEventType } from './LDEventSender'; @@ -14,10 +7,7 @@ import { LDStreamProcessor } from './LDStreamProcessor'; export { DataSource, DataSourceState, - DataSystemInitializer, - DataSystemSynchronizer, - LDInitializerFactory, - LDSynchronizerFactory, + LDDataSourceFactory, LDEventProcessor, LDContextDeduplicator, LDEventSender, diff --git a/packages/shared/common/src/datasource/CompositeDataSource.ts b/packages/shared/common/src/datasource/CompositeDataSource.ts index d831463124..984246b7e9 100644 --- a/packages/shared/common/src/datasource/CompositeDataSource.ts +++ b/packages/shared/common/src/datasource/CompositeDataSource.ts @@ -4,10 +4,10 @@ import { CallbackHandler } from '../api/subsystem/DataSystem/CallbackHandler'; import { DataSource, DataSourceState, - LDInitializerFactory, - LDSynchronizerFactory, + LDDataSourceFactory, } from '../api/subsystem/DataSystem/DataSource'; import { Backoff, DefaultBackoff } from './Backoff'; +import { DataSourceList } from './dataSourceList'; const DEFAULT_FALLBACK_TIME_MS = 2 * 60 * 1000; const DEFAULT_RECOVERY_TIME_MS = 5 * 60 * 1000; @@ -15,7 +15,7 @@ const DEFAULT_RECOVERY_TIME_MS = 5 * 60 * 1000; /** * Represents a transition between data sources. */ -export type Transition = 'none' | 'switchToSync' | 'fallback' | 'recover' | 'stop'; +export type Transition = 'switchToSync' | 'fallback' | 'recover' | 'stop'; /** * Given a {@link DataSourceState}, how long to wait before transitioning. @@ -38,7 +38,8 @@ export class CompositeDataSource implements DataSource { // TODO: SDK-1044 utilize selector from initializers private _initPhaseActive: boolean; - private _currentPosition: number; + private _initFactories: DataSourceList; + private _syncFactories: DataSourceList; private _stopped: boolean = true; private _externalTransitionPromise: Promise; @@ -46,12 +47,12 @@ export class CompositeDataSource implements DataSource { private _cancelTokens: (() => void)[] = []; /** - * @param _initializers factories to create {@link DataSystemInitializer}s, in priority order. - * @param _synchronizers factories to create {@link DataSystemSynchronizer}s, in priority order. + * @param initializers factories to create {@link DataSystemInitializer}s, in priority order. + * @param synchronizers factories to create {@link DataSystemSynchronizer}s, in priority order. */ constructor( - private readonly _initializers: LDInitializerFactory[], - private readonly _synchronizers: LDSynchronizerFactory[], + initializers: LDDataSourceFactory[], + synchronizers: LDDataSourceFactory[], private readonly _logger?: LDLogger, private readonly _transitionConditions: TransitionConditions = { [DataSourceState.Valid]: { @@ -63,16 +64,14 @@ export class CompositeDataSource implements DataSource { transition: 'fallback', }, }, - private readonly _backoff: Backoff = new DefaultBackoff( - 1000, // TODO SDK-1137: handle blacklisting perpetually failing sources - 30000, - ), + private readonly _backoff: Backoff = new DefaultBackoff(1000, 30000), ) { this._externalTransitionPromise = new Promise((resolveTransition) => { this._externalTransitionResolve = resolveTransition; }); - this._initPhaseActive = _initializers.length > 0; // init phase if we have initializers - this._currentPosition = 0; + this._initPhaseActive = initializers.length > 0; // init phase if we have initializers + this._initFactories = new DataSourceList(false, initializers); + this._syncFactories = new DataSourceList(true, synchronizers); } async start( @@ -87,12 +86,22 @@ export class CompositeDataSource implements DataSource { this._stopped = false; this._logger?.debug( - `CompositeDataSource starting with (${this._initializers.length} initializers, ${this._synchronizers.length} synchronizers).`, + `CompositeDataSource starting with (${this._initFactories.length()} initializers, ${this._syncFactories.length()} synchronizers).`, ); - let lastTransition: Transition = 'none'; + + // this wrapper turns status updates from underlying data sources into a valid series of status updates for the consumer of this + // composite data source + const sanitizedStatusCallback = this._wrapStatusCallbackWithSanitizer(statusCallback); + sanitizedStatusCallback(DataSourceState.Initializing); + + let lastTransition: Transition | undefined; // eslint-disable-next-line no-constant-condition while (true) { - const currentDS: DataSource | undefined = this._pickDataSource(lastTransition); + const { + dataSource: currentDS, + isPrimary, + cullDSFactory, + } = this._pickDataSource(lastTransition); const internalTransitionPromise = new Promise((transitionResolve) => { if (currentDS) { // these local variables are used for handling automatic transition related to data source status (ex: recovering to primary after @@ -109,6 +118,7 @@ export class CompositeDataSource implements DataSource { // transition to sync if we get basis during init callbackHandler.disable(); this._consumeCancelToken(cancelScheduledTransition); + sanitizedStatusCallback(DataSourceState.Interrupted); transitionResolve({ transition: 'switchToSync' }); } }, @@ -119,18 +129,23 @@ export class CompositeDataSource implements DataSource { this._logger?.debug( `CompositeDataSource received state ${state} from underlying data source.`, ); - if (err || state === DataSourceState.Closed) { + if (err || state === DataSourceState.Closed || state === DataSourceState.Off) { callbackHandler.disable(); - statusCallback(DataSourceState.Interrupted, err); // underlying errors or closed states are masked as interrupted while we transition + if (state === DataSourceState.Off) { + // don't use this datasource's factory again + cullDSFactory?.(); + } + sanitizedStatusCallback(state, err); this._consumeCancelToken(cancelScheduledTransition); transitionResolve({ transition: 'fallback', err }); // unrecoverable error has occurred, so fallback } else { - statusCallback(state, null); // report the status upward + sanitizedStatusCallback(state); if (state !== lastState) { lastState = state; this._consumeCancelToken(cancelScheduledTransition); // cancel previously scheduled status transition if one was scheduled - const excludeRecovery = this._currentPosition === 0; // primary source cannot recover to itself, so exclude it - const condition = this._lookupTransitionCondition(state, excludeRecovery); + + // primary source cannot recover to itself, so exclude it + const condition = this._lookupTransitionCondition(state, isPrimary); if (condition) { const { promise, cancel } = this._cancellableDelay(condition.durationMS); cancelScheduledTransition = cancel; @@ -138,6 +153,7 @@ export class CompositeDataSource implements DataSource { promise.then(() => { this._consumeCancelToken(cancel); callbackHandler.disable(); + sanitizedStatusCallback(DataSourceState.Interrupted); transitionResolve({ transition: condition.transition }); }); } else { @@ -157,7 +173,7 @@ export class CompositeDataSource implements DataSource { transition: 'stop', err: { name: 'ExhaustedDataSources', - message: `CompositeDataSource has exhausted all configured datasources (${this._initializers.length} initializers, ${this._synchronizers.length} synchronizers).`, + message: `CompositeDataSource has exhausted all configured datasources (${this._initFactories.length()} initializers, ${this._syncFactories.length()} synchronizers).`, }, }); } @@ -192,14 +208,12 @@ export class CompositeDataSource implements DataSource { this._consumeCancelToken(cancelDelay); } + lastTransition = transitionRequest.transition; if (transitionRequest.transition === 'stop') { - // exit the loop + // exit the loop, this is intentionally not the sanitized status callback statusCallback(DataSourceState.Closed, transitionRequest.err); - lastTransition = transitionRequest.transition; break; } - - lastTransition = transitionRequest.transition; } // reset so that run can be called again in the future @@ -214,52 +228,70 @@ export class CompositeDataSource implements DataSource { private _reset() { this._stopped = true; - this._initPhaseActive = this._initializers.length > 0; // init phase if we have initializers; - this._currentPosition = 0; + this._initPhaseActive = this._initFactories.length() > 0; // init phase if we have initializers; + this._initFactories.reset(); + this._syncFactories.reset(); this._externalTransitionPromise = new Promise((tr) => { this._externalTransitionResolve = tr; }); // intentionally not resetting the backoff to avoid a code path that could circumvent throttling } - private _pickDataSource(transition: Transition | undefined): DataSource | undefined { + /** + * Determines the next datasource and returns that datasource as well as a closure to cull the + * datasource from the datasource lists. One example where the cull closure is invoked is if the + * datasource has an unrecoverable error. + */ + private _pickDataSource(transition?: Transition): { + dataSource: DataSource | undefined; + isPrimary: boolean; + cullDSFactory: (() => void) | undefined; + } { + let factory: LDDataSourceFactory | undefined; + let isPrimary: boolean; switch (transition) { case 'switchToSync': this._initPhaseActive = false; // one way toggle to false, unless this class is reset() - this._currentPosition = 0; - break; - case 'fallback': - this._currentPosition += 1; + this._syncFactories.reset(); + isPrimary = this._syncFactories.pos() === 0; + factory = this._syncFactories.next(); break; case 'recover': - this._currentPosition = 0; + if (this._initPhaseActive) { + this._initFactories.reset(); + isPrimary = this._initFactories.pos() === 0; + factory = this._initFactories.next(); + } else { + this._syncFactories.reset(); + isPrimary = this._syncFactories.pos() === 0; + factory = this._syncFactories.next(); + } break; - case 'none': + case 'fallback': default: - // don't do anything in this case + if (this._initPhaseActive) { + isPrimary = this._initFactories.pos() === 0; + factory = this._initFactories.next(); + } else { + isPrimary = this._syncFactories.pos() === 0; + factory = this._syncFactories.next(); + } break; } - if (this._initPhaseActive) { - // We don't loop back through initializers, so if outside range of initializers, instead return undefined. - if (this._currentPosition > this._initializers.length - 1) { - return undefined; - } - - return this._initializers[this._currentPosition](); + if (!factory) { + return { dataSource: undefined, isPrimary, cullDSFactory: undefined }; } - // getting here indicates we are using a synchronizer - // if no synchronizers, return undefined - if (this._synchronizers.length <= 0) { - return undefined; - } - this._currentPosition %= this._synchronizers.length; // modulate position to loop back to start if necessary - if (this._currentPosition > this._synchronizers.length - 1) { - // this is only possible if no synchronizers were provided - return undefined; - } - return this._synchronizers[this._currentPosition](); + return { + dataSource: factory(), + isPrimary, + cullDSFactory: () => { + if (factory) { + this._syncFactories.remove(factory); + } + }, + }; } /** @@ -273,7 +305,7 @@ export class CompositeDataSource implements DataSource { const condition = this._transitionConditions[state]; // exclude recovery can happen for certain initializers/synchronizers (ex: the primary synchronizer shouldn't recover to itself) - if (!condition || (excludeRecover && condition.transition === 'recover')) { + if (excludeRecover && condition?.transition === 'recover') { return undefined; } @@ -303,4 +335,44 @@ export class CompositeDataSource implements DataSource { this._cancelTokens.splice(index, 1); } } + + /** + * This wrapper will ensure the following: + * + * Don't report DataSourceState.Initializing except as first status callback. + * Map underlying DataSourceState.Closed and DataSourceState.Off to interrupted. + * Don't report the same status and error twice in a row. + */ + private _wrapStatusCallbackWithSanitizer( + statusCallback: (status: DataSourceState, err?: any) => void, + ): (status: DataSourceState, err?: any) => void { + let alreadyReportedInitializing = false; + let lastStatus: DataSourceState | undefined; + let lastErr: any; + + return (status: DataSourceState, err?: any) => { + let sanitized = status; + // underlying errors, closed state, or off are masked as interrupted while we transition + if (status === DataSourceState.Closed || status === DataSourceState.Off) { + sanitized = DataSourceState.Interrupted; + } + + // don't report the same combination of values twice in a row + if (sanitized === lastStatus && err === lastErr) { + return; + } + + if (sanitized === DataSourceState.Initializing) { + // don't report initializing again if that has already been reported + if (alreadyReportedInitializing) { + return; + } + alreadyReportedInitializing = true; + } + + lastStatus = sanitized; + lastErr = err; + statusCallback(sanitized, err); + }; + } } diff --git a/packages/shared/common/src/datasource/dataSourceList.ts b/packages/shared/common/src/datasource/dataSourceList.ts new file mode 100644 index 0000000000..e5e740265e --- /dev/null +++ b/packages/shared/common/src/datasource/dataSourceList.ts @@ -0,0 +1,102 @@ +/** + * Helper class for {@link CompositeDataSource} to manage iterating on data sources and removing them on the fly. + */ +export class DataSourceList { + private _list: T[]; + private _circular: boolean; + private _pos: number; + + /** + * @param circular whether to loop off the end of the list back to the start + * @param initialList of content + */ + constructor(circular: boolean, initialList?: T[]) { + this._list = initialList ? [...initialList] : []; + this._circular = circular; + this._pos = 0; + } + + /** + * Returns the current head and then iterates. + */ + next(): T | undefined { + if (this._list.length <= 0 || this._pos >= this._list.length) { + return undefined; + } + + const result = this._list[this._pos]; + + if (this._circular) { + this._pos = (this._pos + 1) % this._list.length; + } else { + this._pos += 1; + } + + return result; + } + + /** + * Replaces all elements with the provided list and resets the position of head to the start. + * + * @param input that will replace existing list + */ + replace(input: T[]): void { + this._list = [...input]; + this._pos = 0; + } + + /** + * Removes the provided element from the list. If the removed element was the head, head moves to next. Consider head may be undefined if list is empty after removal. + * + * @param element to remove + * @returns true if element was removed + */ + remove(element: T): boolean { + const index = this._list.indexOf(element); + if (index < 0) { + return false; + } + + this._list.splice(index, 1); + if (this._list.length > 0) { + // if removed item was before head, adjust head + if (index < this._pos) { + this._pos -= 1; + } + + if (this._circular && this._pos > this._list.length - 1) { + this._pos = 0; + } + } + return true; + } + + /** + * Reset the head position to the start of the list. + */ + reset() { + this._pos = 0; + } + + /** + * @returns the current head position in the list, 0 indexed. + */ + pos() { + return this._pos; + } + + /** + * @returns the current length of the list + */ + length() { + return this._list.length; + } + + /** + * Clears the list and resets head. + */ + clear() { + this._list = []; + this._pos = 0; + } +} diff --git a/packages/shared/sdk-server/__tests__/data_sources/OneShotInitializerFDv2.test.ts b/packages/shared/sdk-server/__tests__/data_sources/OneShotInitializerFDv2.test.ts index a590d6f03a..d797a169b1 100644 --- a/packages/shared/sdk-server/__tests__/data_sources/OneShotInitializerFDv2.test.ts +++ b/packages/shared/sdk-server/__tests__/data_sources/OneShotInitializerFDv2.test.ts @@ -38,10 +38,7 @@ describe('given a one shot initializer', () => { beforeEach(() => { testLogger = new TestLogger(); - initializer = new OneShotInitializerFDv2( - requestor as unknown as Requestor, - testLogger, - ); + initializer = new OneShotInitializerFDv2(requestor as unknown as Requestor, testLogger); }); afterEach(() => { diff --git a/packages/shared/sdk-server/src/LDClientImpl.ts b/packages/shared/sdk-server/src/LDClientImpl.ts index e8afbf53f4..25eb64c0f2 100644 --- a/packages/shared/sdk-server/src/LDClientImpl.ts +++ b/packages/shared/sdk-server/src/LDClientImpl.ts @@ -238,7 +238,7 @@ export default class LDClientImpl implements LDClient { this._updateProcessor?.start(); } else { // make the FDv2 composite datasource with initializers/synchronizers - const initializers: subsystem.LDSynchronizerFactory[] = []; + const initializers: subsystem.LDDataSourceFactory[] = []; // use one shot initializer for performance and cost initializers.push( @@ -249,7 +249,7 @@ export default class LDClientImpl implements LDClient { ), ); - const synchronizers: subsystem.LDSynchronizerFactory[] = []; + const synchronizers: subsystem.LDDataSourceFactory[] = []; // if streaming is configured, add streaming synchronizer if ( isStandardOptions(config.dataSystem.dataSource) || diff --git a/packages/shared/sdk-server/src/data_sources/OneShotInitializerFDv2.ts b/packages/shared/sdk-server/src/data_sources/OneShotInitializerFDv2.ts index 8049511e8d..a148ee11e8 100644 --- a/packages/shared/sdk-server/src/data_sources/OneShotInitializerFDv2.ts +++ b/packages/shared/sdk-server/src/data_sources/OneShotInitializerFDv2.ts @@ -15,7 +15,7 @@ import Requestor from './Requestor'; /** * @internal */ -export default class OneShotInitializerFDv2 implements subsystemCommon.DataSystemInitializer { +export default class OneShotInitializerFDv2 implements subsystemCommon.DataSource { constructor( private readonly _requestor: Requestor, private readonly _logger?: LDLogger, @@ -34,7 +34,7 @@ export default class OneShotInitializerFDv2 implements subsystemCommon.DataSyste const message = httpErrorMessage(err, 'initializer', 'initializer does not retry'); this._logger?.error(message); statusCallback( - subsystemCommon.DataSourceState.Closed, + subsystemCommon.DataSourceState.Off, new LDPollingError(DataSourceErrorKind.ErrorResponse, message, status), ); return; @@ -74,14 +74,15 @@ export default class OneShotInitializerFDv2 implements subsystemCommon.DataSyste this._logger, ); + statusCallback(subsystemCommon.DataSourceState.Valid); + payloadProcessor.addPayloadListener((payload) => { dataCallback(payload.basis, payload); }); payloadProcessor.processEvents(parsed.events); - // TODO: SDK-855 implement blocking duplicate data source state events in DataAvailability API - statusCallback(subsystemCommon.DataSourceState.Valid); + statusCallback(subsystemCommon.DataSourceState.Closed); } catch { // We could not parse this JSON. Report the problem. this._logger?.error('Initialization response contained invalid data'); diff --git a/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts b/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts index 2f155cc288..1e0055fefb 100644 --- a/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts +++ b/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts @@ -18,7 +18,7 @@ export type PollingErrorHandler = (err: LDPollingError) => void; /** * @internal */ -export default class PollingProcessorFDv2 implements subsystemCommon.DataSystemSynchronizer { +export default class PollingProcessorFDv2 implements subsystemCommon.DataSource { private _stopped = false; private _timeoutHandle: any; @@ -51,7 +51,7 @@ export default class PollingProcessorFDv2 implements subsystemCommon.DataSystemS const message = httpErrorMessage(err, 'polling request'); this._logger?.error(message); statusCallback( - subsystemCommon.DataSourceState.Interrupted, + subsystemCommon.DataSourceState.Off, new LDPollingError(DataSourceErrorKind.ErrorResponse, message, status), ); // It is not recoverable, return and do not trigger another poll. @@ -73,6 +73,13 @@ export default class PollingProcessorFDv2 implements subsystemCommon.DataSystemS if (!body) { this._logger?.warn('Response missing body, will retry.'); + statusCallback( + subsystemCommon.DataSourceState.Interrupted, + new LDPollingError( + DataSourceErrorKind.ErrorResponse, + 'Response missing body, will retry.', + ), + ); // schedule poll this._timeoutHandle = setTimeout(() => { this._poll(dataCallback, statusCallback); @@ -113,7 +120,7 @@ export default class PollingProcessorFDv2 implements subsystemCommon.DataSystemS } catch { // We could not parse this JSON. Report the problem and fallthrough to // start another poll. - this._logger?.error('Polling received malformed data'); + this._logger?.error('Malformed JSON data in polling response'); this._logger?.debug(`Malformed JSON follows: ${body}`); statusCallback( subsystemCommon.DataSourceState.Interrupted, diff --git a/packages/shared/sdk-server/src/data_sources/StreamingProcessorFDv2.ts b/packages/shared/sdk-server/src/data_sources/StreamingProcessorFDv2.ts index 7378f79562..dfe4a4064e 100644 --- a/packages/shared/sdk-server/src/data_sources/StreamingProcessorFDv2.ts +++ b/packages/shared/sdk-server/src/data_sources/StreamingProcessorFDv2.ts @@ -18,7 +18,7 @@ import { Flag } from '../evaluation/data/Flag'; import { Segment } from '../evaluation/data/Segment'; import { processFlag, processSegment } from '../store/serialization'; -export default class StreamingProcessorFDv2 implements subsystemCommon.DataSystemSynchronizer { +export default class StreamingProcessorFDv2 implements subsystemCommon.DataSource { private readonly _headers: { [key: string]: string | string[] }; private readonly _streamUri: string; private readonly _logger?: LDLogger; @@ -82,7 +82,7 @@ export default class StreamingProcessorFDv2 implements subsystemCommon.DataSyste this._logger?.error(httpErrorMessage(err, 'streaming request')); this._logConnectionResult(false); statusCallback( - subsystemCommon.DataSourceState.Closed, + subsystemCommon.DataSourceState.Off, new LDStreamingError(DataSourceErrorKind.ErrorResponse, err.message, err.status), ); return false; From 65ff5a91087a3ce3b6023ddc07810d44505999fc Mon Sep 17 00:00:00 2001 From: Todd Anderson Date: Fri, 18 Apr 2025 09:21:40 -0500 Subject: [PATCH 2/4] removing todos and comments --- .../subsystem/DataSystem/CompositeDataSource.test.ts | 2 -- .../common/src/api/subsystem/DataSystem/DataSource.ts | 7 ------- 2 files changed, 9 deletions(-) diff --git a/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts b/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts index 320f18304b..69412966ea 100644 --- a/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts +++ b/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts @@ -37,8 +37,6 @@ function makeZeroBackoff(): Backoff { }; } -// TODO: go through tests and tune status reporting to verify composite data source is correctly coalescing/masking status during transitions. - it('handles initializer getting basis, switching to synchronizer', async () => { const mockInitializer1 = { start: jest diff --git a/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts b/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts index fd9df9cc1c..135bdc95f3 100644 --- a/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts +++ b/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts @@ -12,13 +12,6 @@ export enum DataSourceState { Off, } -// Matthew: include some designation with the Off status that indicates we should fall back to FDv1. -// Expand existing FDv1 polling source and add translation layer in that implementation. -// If FDv1 is also failing, then data system terminates/closes like it would if all FDv2 sources were failing. -// If any FDv2 source indicates to fall back to FDv1, drop all FDv2 sources. - -// Resume here - export interface DataSource { /** * May be called any number of times, if already started, has no effect From 199e5747244f9fc9d0843a4078515373e3094654 Mon Sep 17 00:00:00 2001 From: Todd Anderson Date: Fri, 18 Apr 2025 16:36:54 -0500 Subject: [PATCH 3/4] some more tweaks inspried by integration testing --- .../data_sources/PollingProcessorFDv2.test.ts | 21 ++++++++++++------- .../StreamingProcessorFDv2.test.ts | 2 +- .../data_sources/OneShotInitializerFDv2.ts | 12 ++++------- .../src/data_sources/PollingProcessorFDv2.ts | 10 +++------ 4 files changed, 21 insertions(+), 24 deletions(-) diff --git a/packages/shared/sdk-server/__tests__/data_sources/PollingProcessorFDv2.test.ts b/packages/shared/sdk-server/__tests__/data_sources/PollingProcessorFDv2.test.ts index e0d77e5fd2..e9580c83f3 100644 --- a/packages/shared/sdk-server/__tests__/data_sources/PollingProcessorFDv2.test.ts +++ b/packages/shared/sdk-server/__tests__/data_sources/PollingProcessorFDv2.test.ts @@ -59,10 +59,18 @@ describe('given an event processor', () => { expect(mockStatusCallback).toHaveBeenNthCalledWith(1, subsystem.DataSourceState.Initializing); }); - it('calls callback on success', () => { + it('calls callback on success', async () => { requestor.requestAllData = jest.fn((cb) => cb(undefined, jsonData)); - processor.start(mockDataCallback, mockStatusCallback); - expect(mockDataCallback).toHaveBeenNthCalledWith(1, true, { + let dataCallback; + await new Promise((resolve) => { + dataCallback = jest.fn(() => { + resolve(); + }); + + processor.start(dataCallback, mockStatusCallback); + }); + + expect(dataCallback).toHaveBeenNthCalledWith(1, true, { basis: true, id: `mockId`, state: `mockState`, @@ -179,10 +187,7 @@ describe('given a polling processor with a short poll duration', () => { expect(mockStatusCallback).toHaveBeenNthCalledWith( 2, subsystem.DataSourceState.Interrupted, - new LDPollingError( - DataSourceErrorKind.ErrorResponse, - `Malformed JSON data in polling response`, - ), + new LDPollingError(DataSourceErrorKind.ErrorResponse, `Malformed data in polling response`), ); setTimeout(() => { @@ -208,7 +213,7 @@ describe('given a polling processor with a short poll duration', () => { expect(mockStatusCallback).toHaveBeenNthCalledWith(1, subsystem.DataSourceState.Initializing); expect(mockStatusCallback).toHaveBeenNthCalledWith( 2, - subsystem.DataSourceState.Interrupted, + subsystem.DataSourceState.Off, new LDPollingError( DataSourceErrorKind.ErrorResponse, status === 401 diff --git a/packages/shared/sdk-server/__tests__/data_sources/StreamingProcessorFDv2.test.ts b/packages/shared/sdk-server/__tests__/data_sources/StreamingProcessorFDv2.test.ts index 4f864bc51b..2c9c983128 100644 --- a/packages/shared/sdk-server/__tests__/data_sources/StreamingProcessorFDv2.test.ts +++ b/packages/shared/sdk-server/__tests__/data_sources/StreamingProcessorFDv2.test.ts @@ -311,7 +311,7 @@ describe('given a stream processor with mock event source', () => { expect(willRetry).toBeFalsy(); expect(mockStatusCallback).toHaveBeenNthCalledWith( 2, - subsystem.DataSourceState.Closed, + subsystem.DataSourceState.Off, new LDStreamingError(DataSourceErrorKind.Unknown, testError.message, testError.status), ); expect(logger.error).toBeCalledWith( diff --git a/packages/shared/sdk-server/src/data_sources/OneShotInitializerFDv2.ts b/packages/shared/sdk-server/src/data_sources/OneShotInitializerFDv2.ts index a148ee11e8..8a3699091c 100644 --- a/packages/shared/sdk-server/src/data_sources/OneShotInitializerFDv2.ts +++ b/packages/shared/sdk-server/src/data_sources/OneShotInitializerFDv2.ts @@ -41,7 +41,6 @@ export default class OneShotInitializerFDv2 implements subsystemCommon.DataSourc } if (!body) { - this._logger?.error('One shot initializer response missing body.'); statusCallback( subsystemCommon.DataSourceState.Closed, new LDPollingError( @@ -83,16 +82,13 @@ export default class OneShotInitializerFDv2 implements subsystemCommon.DataSourc payloadProcessor.processEvents(parsed.events); statusCallback(subsystemCommon.DataSourceState.Closed); - } catch { + } catch (error: any) { // We could not parse this JSON. Report the problem. - this._logger?.error('Initialization response contained invalid data'); - this._logger?.debug(`Malformed JSON follows: ${body}`); + this._logger?.error('Response contained invalid data'); + this._logger?.debug(`${err} - Body follows: ${body}`); statusCallback( subsystemCommon.DataSourceState.Closed, - new LDPollingError( - DataSourceErrorKind.InvalidData, - 'Malformed JSON data in polling response', - ), + new LDPollingError(DataSourceErrorKind.InvalidData, 'Malformed data in polling response'), ); } }); diff --git a/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts b/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts index 1e0055fefb..4bd2d8648f 100644 --- a/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts +++ b/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts @@ -72,7 +72,6 @@ export default class PollingProcessorFDv2 implements subsystemCommon.DataSource } if (!body) { - this._logger?.warn('Response missing body, will retry.'); statusCallback( subsystemCommon.DataSourceState.Interrupted, new LDPollingError( @@ -120,14 +119,11 @@ export default class PollingProcessorFDv2 implements subsystemCommon.DataSource } catch { // We could not parse this JSON. Report the problem and fallthrough to // start another poll. - this._logger?.error('Malformed JSON data in polling response'); - this._logger?.debug(`Malformed JSON follows: ${body}`); + this._logger?.error('Response contained invalid data'); + this._logger?.debug(`${err} - Body follows: ${body}`); statusCallback( subsystemCommon.DataSourceState.Interrupted, - new LDPollingError( - DataSourceErrorKind.InvalidData, - 'Malformed JSON data in polling response', - ), + new LDPollingError(DataSourceErrorKind.InvalidData, 'Malformed data in polling response'), ); } From dbb432c3c8b71e632d5250b4ec062ec2e05f6a6a Mon Sep 17 00:00:00 2001 From: Todd Anderson Date: Tue, 22 Apr 2025 12:03:42 -0500 Subject: [PATCH 4/4] removing DataSourceState.Off --- .../DataSystem/CompositeDataSource.test.ts | 183 +++++++++--------- .../api/subsystem/DataSystem/DataSource.ts | 4 +- .../src/datasource/CompositeDataSource.ts | 11 +- .../data_sources/PollingProcessorFDv2.test.ts | 3 +- .../StreamingProcessorFDv2.test.ts | 9 +- .../data_sources/OneShotInitializerFDv2.ts | 2 +- .../src/data_sources/PollingProcessorFDv2.ts | 4 +- .../data_sources/StreamingProcessorFDv2.ts | 4 +- 8 files changed, 115 insertions(+), 105 deletions(-) diff --git a/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts b/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts index 69412966ea..d87e7d3c8b 100644 --- a/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts +++ b/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts @@ -230,9 +230,10 @@ it('removes synchronizer that reports unrecoverable error and loops on remaining _statusCallback: (status: DataSourceState, err?: any) => void, ) => { _statusCallback(DataSourceState.Initializing); - _statusCallback(DataSourceState.Off, { + _statusCallback(DataSourceState.Closed, { name: 'Error', - message: 'I am an unrecoverable error!', // error will lead to culling + message: 'I am an unrecoverable error!', // error will lead to culling, + recoverable: false, }); }, ), @@ -380,93 +381,99 @@ it('reports error when all initializers fail', async () => { ); expect(statusCallback).toHaveBeenNthCalledWith(4, DataSourceState.Closed, { name: 'ExhaustedDataSources', - message: - 'CompositeDataSource has exhausted all configured datasources (2 initializers, 0 synchronizers).', + message: 'CompositeDataSource has exhausted all configured initializers and synchronizers.', }); expect(statusCallback).toHaveBeenCalledTimes(4); }); -// it('it reports DataSourceState Off when all synchronizers report Off', async () => { -// const mockInitializer1: DataSource = { -// start: jest -// .fn() -// .mockImplementation( -// ( -// _dataCallback: (basis: boolean, data: any) => void, -// _statusCallback: (status: DataSourceState, err?: any) => void, -// ) => { -// _statusCallback(DataSourceState.Initializing); -// _statusCallback(DataSourceState.Valid); -// _dataCallback(true, { key: 'init1' }); -// _statusCallback(DataSourceState.Closed); -// }, -// ), -// stop: jest.fn(), -// }; - -// const mockSynchronizer1 = { -// start: jest -// .fn() -// .mockImplementation( -// ( -// _dataCallback: (basis: boolean, data: any) => void, -// _statusCallback: (status: DataSourceState, err?: any) => void, -// ) => { -// _statusCallback(DataSourceState.Initializing); -// _statusCallback(DataSourceState.Off, { -// name: 'Error1', -// message: 'I am an unrecoverable error!', -// }); -// }, -// ), -// stop: jest.fn(), -// }; - -// const mockSynchronizer2 = { -// start: jest -// .fn() -// .mockImplementation( -// ( -// _dataCallback: (basis: boolean, data: any) => void, -// _statusCallback: (status: DataSourceState, err?: any) => void, -// ) => { -// _statusCallback(DataSourceState.Initializing); -// _statusCallback(DataSourceState.Off, { -// name: 'Error2', -// message: 'I am an unrecoverable error!', -// }); -// }, -// ), -// stop: jest.fn(), -// }; - -// const underTest = new CompositeDataSource( -// [makeDataSourceFactory(mockInitializer1)], -// [makeDataSourceFactory(mockSynchronizer1), makeDataSourceFactory(mockSynchronizer2)], -// undefined, -// makeTestTransitionConditions(), -// makeZeroBackoff(), -// ); - -// let statusCallback; -// await new Promise((resolve) => { -// statusCallback = jest.fn((state: DataSourceState, err: any) => { -// if (err && err.name === 'Error2') { -// resolve(); -// } -// }); - -// underTest.start(jest.fn(), statusCallback); -// }); - -// expect(mockInitializer1.start).toHaveBeenCalledTimes(1); -// expect(mockSynchronizer1.start).toHaveBeenCalledTimes(1); -// expect(mockSynchronizer2.start).toHaveBeenCalledTimes(1); -// expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Initializing, null); -// expect(statusCallback).toHaveBeenNthCalledWith(2, DataSourceState.Valid, null); -// expect(statusCallback).toHaveBeenNthCalledWith(3, DataSourceState.Interrupted, expect.anything()); -// expect(statusCallback).toHaveBeenNthCalledWith(4, DataSourceState.Off, null); -// }); +it('it reports DataSourceState Closed when all synchronizers report Closed with unrecoverable errors', async () => { + const mockInitializer1: DataSource = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Valid); + _dataCallback(true, { key: 'init1' }); + _statusCallback(DataSourceState.Closed); + }, + ), + stop: jest.fn(), + }; + + const mockSynchronizer1 = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Closed, { + name: 'Error1', + message: 'I am an unrecoverable error!', + recoverable: false, + }); + }, + ), + stop: jest.fn(), + }; + + const mockSynchronizer2 = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Closed, { + name: 'Error2', + message: 'I am an unrecoverable error!', + recoverable: false, + }); + }, + ), + stop: jest.fn(), + }; + + const underTest = new CompositeDataSource( + [makeDataSourceFactory(mockInitializer1)], + [makeDataSourceFactory(mockSynchronizer1), makeDataSourceFactory(mockSynchronizer2)], + undefined, + makeTestTransitionConditions(), + makeZeroBackoff(), + ); + + let statusCallback; + await new Promise((resolve) => { + statusCallback = jest.fn((state: DataSourceState, _: any) => { + if (state === DataSourceState.Closed) { + resolve(); + } + }); + + underTest.start(jest.fn(), statusCallback); + }); + + expect(mockInitializer1.start).toHaveBeenCalledTimes(1); + expect(mockSynchronizer1.start).toHaveBeenCalledTimes(1); + expect(mockSynchronizer2.start).toHaveBeenCalledTimes(1); + expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Initializing, undefined); + expect(statusCallback).toHaveBeenNthCalledWith(2, DataSourceState.Valid, undefined); + expect(statusCallback).toHaveBeenNthCalledWith(3, DataSourceState.Interrupted, undefined); // initializer closes properly + expect(statusCallback).toHaveBeenNthCalledWith(4, DataSourceState.Interrupted, expect.anything()); // sync1 closed with unrecoverable error + expect(statusCallback).toHaveBeenNthCalledWith(5, DataSourceState.Interrupted, expect.anything()); // sync2 closed with unrecoverable error + expect(statusCallback).toHaveBeenNthCalledWith(6, DataSourceState.Closed, { + name: 'ExhaustedDataSources', + message: `CompositeDataSource has exhausted all configured initializers and synchronizers.`, + }); +}); it('can be stopped when in thrashing synchronizer fallback loop', async () => { const mockInitializer1 = { @@ -639,8 +646,7 @@ it('is well behaved with no initializers and no synchronizers configured', async expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Initializing, undefined); // initializer expect(statusCallback).toHaveBeenNthCalledWith(2, DataSourceState.Closed, { name: 'ExhaustedDataSources', - message: - 'CompositeDataSource has exhausted all configured datasources (0 initializers, 0 synchronizers).', + message: 'CompositeDataSource has exhausted all configured initializers and synchronizers.', }); }); @@ -730,8 +736,7 @@ it('is well behaved with an initializer and no synchronizers configured', async expect(statusCallback).toHaveBeenNthCalledWith(3, DataSourceState.Interrupted, undefined); // initializer got data expect(statusCallback).toHaveBeenNthCalledWith(4, DataSourceState.Closed, { name: 'ExhaustedDataSources', - message: - 'CompositeDataSource has exhausted all configured datasources (1 initializers, 0 synchronizers).', + message: 'CompositeDataSource has exhausted all configured initializers and synchronizers.', }); }); diff --git a/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts b/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts index 135bdc95f3..3b61a088f3 100644 --- a/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts +++ b/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts @@ -6,10 +6,8 @@ export enum DataSourceState { Initializing, // Transient issue, automatic retry is expected Interrupted, - // Data source was closed and will not retry + // Data source was closed and will not retry automatically. Closed, - // This datasource encountered an unrecoverable error and it is not expected to be resolved through trying again in the future - Off, } export interface DataSource { diff --git a/packages/shared/common/src/datasource/CompositeDataSource.ts b/packages/shared/common/src/datasource/CompositeDataSource.ts index 984246b7e9..e8eb4d0e42 100644 --- a/packages/shared/common/src/datasource/CompositeDataSource.ts +++ b/packages/shared/common/src/datasource/CompositeDataSource.ts @@ -102,6 +102,7 @@ export class CompositeDataSource implements DataSource { isPrimary, cullDSFactory, } = this._pickDataSource(lastTransition); + const internalTransitionPromise = new Promise((transitionResolve) => { if (currentDS) { // these local variables are used for handling automatic transition related to data source status (ex: recovering to primary after @@ -129,9 +130,9 @@ export class CompositeDataSource implements DataSource { this._logger?.debug( `CompositeDataSource received state ${state} from underlying data source.`, ); - if (err || state === DataSourceState.Closed || state === DataSourceState.Off) { + if (err || state === DataSourceState.Closed) { callbackHandler.disable(); - if (state === DataSourceState.Off) { + if (err.recoverable === false) { // don't use this datasource's factory again cullDSFactory?.(); } @@ -173,7 +174,7 @@ export class CompositeDataSource implements DataSource { transition: 'stop', err: { name: 'ExhaustedDataSources', - message: `CompositeDataSource has exhausted all configured datasources (${this._initFactories.length()} initializers, ${this._syncFactories.length()} synchronizers).`, + message: `CompositeDataSource has exhausted all configured initializers and synchronizers.`, }, }); } @@ -340,7 +341,7 @@ export class CompositeDataSource implements DataSource { * This wrapper will ensure the following: * * Don't report DataSourceState.Initializing except as first status callback. - * Map underlying DataSourceState.Closed and DataSourceState.Off to interrupted. + * Map underlying DataSourceState.Closed to interrupted. * Don't report the same status and error twice in a row. */ private _wrapStatusCallbackWithSanitizer( @@ -353,7 +354,7 @@ export class CompositeDataSource implements DataSource { return (status: DataSourceState, err?: any) => { let sanitized = status; // underlying errors, closed state, or off are masked as interrupted while we transition - if (status === DataSourceState.Closed || status === DataSourceState.Off) { + if (status === DataSourceState.Closed) { sanitized = DataSourceState.Interrupted; } diff --git a/packages/shared/sdk-server/__tests__/data_sources/PollingProcessorFDv2.test.ts b/packages/shared/sdk-server/__tests__/data_sources/PollingProcessorFDv2.test.ts index e9580c83f3..3cadd04e28 100644 --- a/packages/shared/sdk-server/__tests__/data_sources/PollingProcessorFDv2.test.ts +++ b/packages/shared/sdk-server/__tests__/data_sources/PollingProcessorFDv2.test.ts @@ -213,13 +213,14 @@ describe('given a polling processor with a short poll duration', () => { expect(mockStatusCallback).toHaveBeenNthCalledWith(1, subsystem.DataSourceState.Initializing); expect(mockStatusCallback).toHaveBeenNthCalledWith( 2, - subsystem.DataSourceState.Off, + subsystem.DataSourceState.Closed, new LDPollingError( DataSourceErrorKind.ErrorResponse, status === 401 ? `Received error ${status} (invalid SDK key) for polling request - giving up permanently` : `Received error ${status} for polling request - giving up permanently`, status as number, + false, ), ); setTimeout(() => { diff --git a/packages/shared/sdk-server/__tests__/data_sources/StreamingProcessorFDv2.test.ts b/packages/shared/sdk-server/__tests__/data_sources/StreamingProcessorFDv2.test.ts index 2c9c983128..39aa6a02a1 100644 --- a/packages/shared/sdk-server/__tests__/data_sources/StreamingProcessorFDv2.test.ts +++ b/packages/shared/sdk-server/__tests__/data_sources/StreamingProcessorFDv2.test.ts @@ -311,8 +311,13 @@ describe('given a stream processor with mock event source', () => { expect(willRetry).toBeFalsy(); expect(mockStatusCallback).toHaveBeenNthCalledWith( 2, - subsystem.DataSourceState.Off, - new LDStreamingError(DataSourceErrorKind.Unknown, testError.message, testError.status), + subsystem.DataSourceState.Closed, + new LDStreamingError( + DataSourceErrorKind.Unknown, + testError.message, + testError.status, + true, + ), ); expect(logger.error).toBeCalledWith( expect.stringMatching(new RegExp(`${status}.*permanently`)), diff --git a/packages/shared/sdk-server/src/data_sources/OneShotInitializerFDv2.ts b/packages/shared/sdk-server/src/data_sources/OneShotInitializerFDv2.ts index 8a3699091c..d511361228 100644 --- a/packages/shared/sdk-server/src/data_sources/OneShotInitializerFDv2.ts +++ b/packages/shared/sdk-server/src/data_sources/OneShotInitializerFDv2.ts @@ -34,7 +34,7 @@ export default class OneShotInitializerFDv2 implements subsystemCommon.DataSourc const message = httpErrorMessage(err, 'initializer', 'initializer does not retry'); this._logger?.error(message); statusCallback( - subsystemCommon.DataSourceState.Off, + subsystemCommon.DataSourceState.Closed, new LDPollingError(DataSourceErrorKind.ErrorResponse, message, status), ); return; diff --git a/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts b/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts index 4bd2d8648f..dc0f9da02f 100644 --- a/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts +++ b/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts @@ -51,8 +51,8 @@ export default class PollingProcessorFDv2 implements subsystemCommon.DataSource const message = httpErrorMessage(err, 'polling request'); this._logger?.error(message); statusCallback( - subsystemCommon.DataSourceState.Off, - new LDPollingError(DataSourceErrorKind.ErrorResponse, message, status), + subsystemCommon.DataSourceState.Closed, + new LDPollingError(DataSourceErrorKind.ErrorResponse, message, status, false), ); // It is not recoverable, return and do not trigger another poll. return; diff --git a/packages/shared/sdk-server/src/data_sources/StreamingProcessorFDv2.ts b/packages/shared/sdk-server/src/data_sources/StreamingProcessorFDv2.ts index dfe4a4064e..87c609acd6 100644 --- a/packages/shared/sdk-server/src/data_sources/StreamingProcessorFDv2.ts +++ b/packages/shared/sdk-server/src/data_sources/StreamingProcessorFDv2.ts @@ -82,8 +82,8 @@ export default class StreamingProcessorFDv2 implements subsystemCommon.DataSourc this._logger?.error(httpErrorMessage(err, 'streaming request')); this._logConnectionResult(false); statusCallback( - subsystemCommon.DataSourceState.Off, - new LDStreamingError(DataSourceErrorKind.ErrorResponse, err.message, err.status), + subsystemCommon.DataSourceState.Closed, + new LDStreamingError(DataSourceErrorKind.ErrorResponse, err.message, err.status, false), ); return false; }