diff --git a/packages/shared/common/__tests__/datasource/dataSourceList.test.ts b/packages/shared/common/__tests__/datasource/dataSourceList.test.ts new file mode 100644 index 0000000000..398eb7b503 --- /dev/null +++ b/packages/shared/common/__tests__/datasource/dataSourceList.test.ts @@ -0,0 +1,127 @@ +import { DataSourceList } from '../../src/datasource/dataSourceList'; + +it('replace is well behaved', async () => { + const underTest = new DataSourceList(true, [1, 2, 3]); + expect(underTest.next()).toEqual(1); + expect(underTest.next()).toEqual(2); + underTest.replace([4, 5, 6]); + expect(underTest.next()).toEqual(4); + expect(underTest.next()).toEqual(5); + expect(underTest.next()).toEqual(6); + expect(underTest.next()).toEqual(4); + expect(underTest.next()).toEqual(5); +}); + +it('it cycles correctly after replacing non-empty list', async () => { + const underTest = new DataSourceList(true, [1, 2, 3]); + expect(underTest.next()).toEqual(1); + expect(underTest.next()).toEqual(2); + expect(underTest.next()).toEqual(3); + + underTest.remove(1); + expect(underTest.next()).toEqual(2); + expect(underTest.next()).toEqual(3); + expect(underTest.next()).toEqual(2); + expect(underTest.next()).toEqual(3); + + underTest.remove(3); + expect(underTest.next()).toEqual(2); + expect(underTest.next()).toEqual(2); + + underTest.remove(2); + expect(underTest.next()).toBeUndefined(); + expect(underTest.next()).toBeUndefined(); + expect(underTest.next()).toBeUndefined(); + + underTest.replace([4, 5, 6]); + + expect(underTest.next()).toEqual(4); + expect(underTest.next()).toEqual(5); + expect(underTest.next()).toEqual(6); + + underTest.remove(4); + expect(underTest.next()).toEqual(5); + expect(underTest.next()).toEqual(6); + expect(underTest.next()).toEqual(5); + expect(underTest.next()).toEqual(6); + + underTest.remove(6); + expect(underTest.next()).toEqual(5); + expect(underTest.next()).toEqual(5); + + underTest.remove(5); + expect(underTest.next()).toBeUndefined(); + expect(underTest.next()).toBeUndefined(); +}); + +it('cycles correctly after replacing empty list', async () => { + const underTest = new DataSourceList(true, []); + + underTest.replace([1, 2, 3]); + + expect(underTest.next()).toEqual(1); + expect(underTest.next()).toEqual(2); + expect(underTest.next()).toEqual(3); + + underTest.remove(1); + expect(underTest.next()).toEqual(2); + expect(underTest.next()).toEqual(3); + expect(underTest.next()).toEqual(2); + expect(underTest.next()).toEqual(3); + + underTest.remove(3); + expect(underTest.next()).toEqual(2); + expect(underTest.next()).toEqual(2); + + underTest.remove(2); + expect(underTest.next()).toBeUndefined(); + expect(underTest.next()).toBeUndefined(); +}); + +it('removing head is well behaved at start', async () => { + const underTest = new DataSourceList(true, [1, 2, 3]); + // head is now pointing to 1 + underTest.remove(1); + expect(underTest.next()).toEqual(2); + expect(underTest.next()).toEqual(3); + expect(underTest.next()).toEqual(2); +}); + +it('removing head is well behaved in middle', async () => { + const underTest = new DataSourceList(true, [1, 2, 3]); + expect(underTest.next()).toEqual(1); + // head is now pointing to 2 + underTest.remove(2); + expect(underTest.next()).toEqual(3); + expect(underTest.next()).toEqual(1); + expect(underTest.next()).toEqual(3); +}); + +it('removing head is well behaved at end', async () => { + const underTest = new DataSourceList(true, [1, 2, 3]); + expect(underTest.next()).toEqual(1); + expect(underTest.next()).toEqual(2); + // head is now pointing to 3 + underTest.remove(3); + expect(underTest.next()).toEqual(1); + expect(underTest.next()).toEqual(2); + expect(underTest.next()).toEqual(1); +}); + +it('removing existing returns true', async () => { + const underTest = new DataSourceList(true, [1]); + expect(underTest.remove(1)).toEqual(true); + expect(underTest.next()).toBeUndefined(); +}); + +it('removing nonexistent returns false', async () => { + const underTest = new DataSourceList(true, []); + expect(underTest.remove(1)).toEqual(false); + expect(underTest.next()).toBeUndefined(); +}); + +it('single element removed and next called', async () => { + const underTest = new DataSourceList(true, [1]); + expect(underTest.remove(1)).toEqual(true); + expect(underTest.next()).toBeUndefined(); +}); diff --git a/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts b/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts index a83025808f..d87e7d3c8b 100644 --- a/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts +++ b/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts @@ -1,9 +1,7 @@ import { + DataSource, DataSourceState, - DataSystemInitializer, - DataSystemSynchronizer, - LDInitializerFactory, - LDSynchronizerFactory, + LDDataSourceFactory, } from '../../../src/api/subsystem/DataSystem/DataSource'; import { Backoff } from '../../../src/datasource/Backoff'; import { @@ -11,31 +9,19 @@ import { TransitionConditions, } from '../../../src/datasource/CompositeDataSource'; -function makeInitializerFactory(internal: DataSystemInitializer): LDInitializerFactory { - return () => internal; -} - -function makeSynchronizerFactory(internal: DataSystemSynchronizer): LDSynchronizerFactory { +function makeDataSourceFactory(internal: DataSource): LDDataSourceFactory { return () => internal; } function makeTestTransitionConditions(): TransitionConditions { return { - [DataSourceState.Initializing]: { - durationMS: 0, - transition: 'fallback', - }, [DataSourceState.Interrupted]: { durationMS: 0, transition: 'fallback', }, - [DataSourceState.Closed]: { - durationMS: 0, - transition: 'fallback', - }, [DataSourceState.Valid]: { durationMS: 0, - transition: 'fallback', + transition: 'recover', }, }; } @@ -51,7 +37,7 @@ function makeZeroBackoff(): Backoff { }; } -it('handles initializer getting basis, switching to syncrhonizer', async () => { +it('handles initializer getting basis, switching to synchronizer', async () => { const mockInitializer1 = { start: jest .fn() @@ -60,7 +46,10 @@ it('handles initializer getting basis, switching to syncrhonizer', async () => { _dataCallback: (basis: boolean, data: any) => void, _statusCallback: (status: DataSourceState, err?: any) => void, ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Valid); _dataCallback(true, { key: 'init1' }); + _statusCallback(DataSourceState.Closed); }, ), stop: jest.fn(), @@ -75,6 +64,8 @@ it('handles initializer getting basis, switching to syncrhonizer', async () => { _dataCallback: (basis: boolean, data: any) => void, _statusCallback: (status: DataSourceState, err?: any) => void, ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Valid); _dataCallback(false, mockSynchronizer1Data); }, ), @@ -82,33 +73,39 @@ it('handles initializer getting basis, switching to syncrhonizer', async () => { }; const underTest = new CompositeDataSource( - [makeInitializerFactory(mockInitializer1)], - [makeSynchronizerFactory(mockSynchronizer1)], + [makeDataSourceFactory(mockInitializer1)], + [makeDataSourceFactory(mockSynchronizer1)], undefined, makeTestTransitionConditions(), makeZeroBackoff(), ); - let callback; + let dataCallback; + const statusCallback = jest.fn(); await new Promise((resolve) => { - callback = jest.fn((_: boolean, data: any) => { + dataCallback = jest.fn((_: boolean, data: any) => { if (data === mockSynchronizer1Data) { resolve(); } }); - underTest.start(callback, jest.fn()); + underTest.start(dataCallback, statusCallback); }); expect(mockInitializer1.start).toHaveBeenCalledTimes(1); expect(mockSynchronizer1.start).toHaveBeenCalledTimes(1); - expect(callback).toHaveBeenCalledTimes(2); - expect(callback).toHaveBeenNthCalledWith(1, true, { key: 'init1' }); - expect(callback).toHaveBeenNthCalledWith(2, false, { key: 'sync1' }); + expect(dataCallback).toHaveBeenCalledTimes(2); + expect(dataCallback).toHaveBeenNthCalledWith(1, true, { key: 'init1' }); + expect(dataCallback).toHaveBeenNthCalledWith(2, false, { key: 'sync1' }); + expect(statusCallback).toHaveBeenCalledTimes(4); + expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Initializing, undefined); + expect(statusCallback).toHaveBeenNthCalledWith(2, DataSourceState.Valid, undefined); + expect(statusCallback).toHaveBeenNthCalledWith(3, DataSourceState.Interrupted, undefined); + expect(statusCallback).toHaveBeenNthCalledWith(4, DataSourceState.Valid, undefined); }); it('handles initializer getting basis, switches to synchronizer 1, falls back to synchronizer 2, recovers to synchronizer 1', async () => { - const mockInitializer1: DataSystemInitializer = { + const mockInitializer1: DataSource = { start: jest .fn() .mockImplementation( @@ -116,7 +113,10 @@ it('handles initializer getting basis, switches to synchronizer 1, falls back to _dataCallback: (basis: boolean, data: any) => void, _statusCallback: (status: DataSourceState, err?: any) => void, ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Valid); _dataCallback(true, { key: 'init1' }); + _statusCallback(DataSourceState.Closed); }, ), stop: jest.fn(), @@ -133,11 +133,14 @@ it('handles initializer getting basis, switches to synchronizer 1, falls back to _statusCallback: (status: DataSourceState, err?: any) => void, ) => { if (sync1RunCount === 0) { + _statusCallback(DataSourceState.Initializing); _statusCallback(DataSourceState.Closed, { name: 'Error', message: 'I am error...man!', }); // error that will lead to fallback } else { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Valid); _dataCallback(false, mockSynchronizer1Data); // second start will lead to data } sync1RunCount += 1; @@ -155,22 +158,24 @@ it('handles initializer getting basis, switches to synchronizer 1, falls back to _dataCallback: (basis: boolean, data: any) => void, _statusCallback: (status: DataSourceState, err?: any) => void, ) => { - _dataCallback(false, mockSynchronizer2Data); + _statusCallback(DataSourceState.Initializing); _statusCallback(DataSourceState.Valid, null); // this should lead to recovery + _dataCallback(false, mockSynchronizer2Data); }, ), stop: jest.fn(), }; const underTest = new CompositeDataSource( - [makeInitializerFactory(mockInitializer1)], - [makeSynchronizerFactory(mockSynchronizer1), makeSynchronizerFactory(mockSynchronizer2)], + [makeDataSourceFactory(mockInitializer1)], + [makeDataSourceFactory(mockSynchronizer1), makeDataSourceFactory(mockSynchronizer2)], undefined, makeTestTransitionConditions(), makeZeroBackoff(), ); let callback; + const statusCallback = jest.fn(); await new Promise((resolve) => { callback = jest.fn((_: boolean, data: any) => { if (data === mockSynchronizer1Data) { @@ -178,7 +183,7 @@ it('handles initializer getting basis, switches to synchronizer 1, falls back to } }); - underTest.start(callback, jest.fn()); + underTest.start(callback, statusCallback); }); expect(mockInitializer1.start).toHaveBeenCalledTimes(1); @@ -188,6 +193,117 @@ it('handles initializer getting basis, switches to synchronizer 1, falls back to expect(callback).toHaveBeenNthCalledWith(1, true, { key: 'init1' }); expect(callback).toHaveBeenNthCalledWith(2, false, { key: 'sync2' }); // sync1 errors and fallsback expect(callback).toHaveBeenNthCalledWith(3, false, { key: 'sync1' }); // sync2 recovers back to sync1 + expect(statusCallback).toHaveBeenCalledTimes(7); + expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Initializing, undefined); + expect(statusCallback).toHaveBeenNthCalledWith(2, DataSourceState.Valid, undefined); // initializer got data + expect(statusCallback).toHaveBeenNthCalledWith(3, DataSourceState.Interrupted, undefined); // initializer closed + expect(statusCallback).toHaveBeenNthCalledWith(4, DataSourceState.Interrupted, expect.anything()); // sync1 error + expect(statusCallback).toHaveBeenNthCalledWith(5, DataSourceState.Valid, undefined); // sync2 got data + expect(statusCallback).toHaveBeenNthCalledWith(6, DataSourceState.Interrupted, undefined); // recover to sync1 + expect(statusCallback).toHaveBeenNthCalledWith(7, DataSourceState.Valid, undefined); // sync1 valid +}); + +it('removes synchronizer that reports unrecoverable error and loops on remaining synchronizer', async () => { + const mockInitializer1: DataSource = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Valid); + _dataCallback(true, { key: 'init1' }); + _statusCallback(DataSourceState.Closed); + }, + ), + stop: jest.fn(), + }; + + const mockSynchronizer1 = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Closed, { + name: 'Error', + message: 'I am an unrecoverable error!', // error will lead to culling, + recoverable: false, + }); + }, + ), + stop: jest.fn(), + }; + + let sync2RunCount = 0; + const mockSynchronizer2Data = { key: 'sync2' }; + const mockSynchronizer2 = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + if (sync2RunCount < 5) { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Closed, { + name: 'Error', + message: `I am a recoverable error ${sync2RunCount}`, + }); // error that will lead to fallback + } else { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Valid); + _dataCallback(false, mockSynchronizer2Data); // second start will lead to data + } + sync2RunCount += 1; + }, + ), + stop: jest.fn(), + }; + + const underTest = new CompositeDataSource( + [makeDataSourceFactory(mockInitializer1)], + [makeDataSourceFactory(mockSynchronizer1), makeDataSourceFactory(mockSynchronizer2)], + undefined, + makeTestTransitionConditions(), + makeZeroBackoff(), + ); + + let dataCallback; + const statusCallback = jest.fn(); + await new Promise((resolve) => { + dataCallback = jest.fn((_: boolean, data: any) => { + if (data === mockSynchronizer2Data) { + resolve(); + } + }); + + underTest.start(dataCallback, statusCallback); + }); + + expect(mockInitializer1.start).toHaveBeenCalledTimes(1); + expect(mockSynchronizer1.start).toHaveBeenCalledTimes(1); // only called the first time + expect(mockSynchronizer2.start).toHaveBeenCalledTimes(6); // called 5 times with recoverable errors then 6th time succeeds + expect(dataCallback).toHaveBeenCalledTimes(2); + expect(dataCallback).toHaveBeenNthCalledWith(1, true, { key: 'init1' }); + expect(dataCallback).toHaveBeenNthCalledWith(2, false, { key: 'sync2' }); + expect(statusCallback).toHaveBeenCalledTimes(10); + expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Initializing, undefined); + expect(statusCallback).toHaveBeenNthCalledWith(2, DataSourceState.Valid, undefined); // initializer got data + expect(statusCallback).toHaveBeenNthCalledWith(3, DataSourceState.Interrupted, undefined); // initializer closed + expect(statusCallback).toHaveBeenNthCalledWith(4, DataSourceState.Interrupted, expect.anything()); // sync1 unrecoverable error + expect(statusCallback).toHaveBeenNthCalledWith(5, DataSourceState.Interrupted, expect.anything()); // sync2 recoverable error + expect(statusCallback).toHaveBeenNthCalledWith(6, DataSourceState.Interrupted, expect.anything()); // sync2 recoverable error + expect(statusCallback).toHaveBeenNthCalledWith(7, DataSourceState.Interrupted, expect.anything()); // sync2 recoverable error + expect(statusCallback).toHaveBeenNthCalledWith(8, DataSourceState.Interrupted, expect.anything()); // sync2 recoverable error + expect(statusCallback).toHaveBeenNthCalledWith(9, DataSourceState.Interrupted, expect.anything()); // sync2 recoverable error + expect(statusCallback).toHaveBeenNthCalledWith(10, DataSourceState.Valid, undefined); // sync1 valid }); it('reports error when all initializers fail', async () => { @@ -195,7 +311,7 @@ it('reports error when all initializers fail', async () => { name: 'Error', message: 'I am initializer1 error!', }; - const mockInitializer1: DataSystemInitializer = { + const mockInitializer1: DataSource = { start: jest .fn() .mockImplementation( @@ -203,6 +319,7 @@ it('reports error when all initializers fail', async () => { _dataCallback: (basis: boolean, data: any) => void, _statusCallback: (status: DataSourceState, err?: any) => void, ) => { + _statusCallback(DataSourceState.Initializing); _statusCallback(DataSourceState.Closed, mockInitializer1Error); }, ), @@ -213,7 +330,7 @@ it('reports error when all initializers fail', async () => { name: 'Error', message: 'I am initializer2 error!', }; - const mockInitializer2: DataSystemInitializer = { + const mockInitializer2: DataSource = { start: jest .fn() .mockImplementation( @@ -221,6 +338,7 @@ it('reports error when all initializers fail', async () => { _dataCallback: (basis: boolean, data: any) => void, _statusCallback: (status: DataSourceState, err?: any) => void, ) => { + _statusCallback(DataSourceState.Initializing); _statusCallback(DataSourceState.Closed, mockInitializer2Error); }, ), @@ -228,7 +346,7 @@ it('reports error when all initializers fail', async () => { }; const underTest = new CompositeDataSource( - [makeInitializerFactory(mockInitializer1), makeInitializerFactory(mockInitializer2)], + [makeDataSourceFactory(mockInitializer1), makeDataSourceFactory(mockInitializer2)], [], // no synchronizers for this test undefined, makeTestTransitionConditions(), @@ -250,22 +368,111 @@ it('reports error when all initializers fail', async () => { expect(mockInitializer1.start).toHaveBeenCalledTimes(1); expect(mockInitializer2.start).toHaveBeenCalledTimes(1); expect(dataCallback).toHaveBeenCalledTimes(0); + expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Initializing, undefined); expect(statusCallback).toHaveBeenNthCalledWith( - 1, + 2, DataSourceState.Interrupted, mockInitializer1Error, ); expect(statusCallback).toHaveBeenNthCalledWith( - 2, + 3, DataSourceState.Interrupted, mockInitializer2Error, ); - expect(statusCallback).toHaveBeenNthCalledWith(3, DataSourceState.Closed, { + expect(statusCallback).toHaveBeenNthCalledWith(4, DataSourceState.Closed, { name: 'ExhaustedDataSources', - message: - 'CompositeDataSource has exhausted all configured datasources (2 initializers, 0 synchronizers).', + message: 'CompositeDataSource has exhausted all configured initializers and synchronizers.', + }); + expect(statusCallback).toHaveBeenCalledTimes(4); +}); + +it('it reports DataSourceState Closed when all synchronizers report Closed with unrecoverable errors', async () => { + const mockInitializer1: DataSource = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Valid); + _dataCallback(true, { key: 'init1' }); + _statusCallback(DataSourceState.Closed); + }, + ), + stop: jest.fn(), + }; + + const mockSynchronizer1 = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Closed, { + name: 'Error1', + message: 'I am an unrecoverable error!', + recoverable: false, + }); + }, + ), + stop: jest.fn(), + }; + + const mockSynchronizer2 = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Closed, { + name: 'Error2', + message: 'I am an unrecoverable error!', + recoverable: false, + }); + }, + ), + stop: jest.fn(), + }; + + const underTest = new CompositeDataSource( + [makeDataSourceFactory(mockInitializer1)], + [makeDataSourceFactory(mockSynchronizer1), makeDataSourceFactory(mockSynchronizer2)], + undefined, + makeTestTransitionConditions(), + makeZeroBackoff(), + ); + + let statusCallback; + await new Promise((resolve) => { + statusCallback = jest.fn((state: DataSourceState, _: any) => { + if (state === DataSourceState.Closed) { + resolve(); + } + }); + + underTest.start(jest.fn(), statusCallback); + }); + + expect(mockInitializer1.start).toHaveBeenCalledTimes(1); + expect(mockSynchronizer1.start).toHaveBeenCalledTimes(1); + expect(mockSynchronizer2.start).toHaveBeenCalledTimes(1); + expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Initializing, undefined); + expect(statusCallback).toHaveBeenNthCalledWith(2, DataSourceState.Valid, undefined); + expect(statusCallback).toHaveBeenNthCalledWith(3, DataSourceState.Interrupted, undefined); // initializer closes properly + expect(statusCallback).toHaveBeenNthCalledWith(4, DataSourceState.Interrupted, expect.anything()); // sync1 closed with unrecoverable error + expect(statusCallback).toHaveBeenNthCalledWith(5, DataSourceState.Interrupted, expect.anything()); // sync2 closed with unrecoverable error + expect(statusCallback).toHaveBeenNthCalledWith(6, DataSourceState.Closed, { + name: 'ExhaustedDataSources', + message: `CompositeDataSource has exhausted all configured initializers and synchronizers.`, }); - expect(statusCallback).toHaveBeenCalledTimes(3); }); it('can be stopped when in thrashing synchronizer fallback loop', async () => { @@ -299,8 +506,8 @@ it('can be stopped when in thrashing synchronizer fallback loop', async () => { }; const underTest = new CompositeDataSource( - [makeInitializerFactory(mockInitializer1)], - [makeSynchronizerFactory(mockSynchronizer1)], // will continuously fallback onto itself + [makeDataSourceFactory(mockInitializer1)], + [makeDataSourceFactory(mockSynchronizer1)], // will continuously fallback onto itself undefined, makeTestTransitionConditions(), makeZeroBackoff(), @@ -309,8 +516,8 @@ it('can be stopped when in thrashing synchronizer fallback loop', async () => { const dataCallback = jest.fn(); let statusCallback; await new Promise((resolve) => { - statusCallback = jest.fn((state: DataSourceState, _: any) => { - if (state === DataSourceState.Interrupted) { + statusCallback = jest.fn((state: DataSourceState, err: any) => { + if (state === DataSourceState.Interrupted && err === mockSynchronizer1Error) { resolve(); // waiting interruption due to sync error } }); @@ -328,12 +535,14 @@ it('can be stopped when in thrashing synchronizer fallback loop', async () => { setTimeout(f, 100); }); + expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Initializing, undefined); // initializer + expect(statusCallback).toHaveBeenNthCalledWith(2, DataSourceState.Interrupted, undefined); // initializer closes expect(statusCallback).toHaveBeenNthCalledWith( - 1, + 3, DataSourceState.Interrupted, mockSynchronizer1Error, - ); - expect(statusCallback).toHaveBeenNthCalledWith(2, DataSourceState.Closed, undefined); + ); // synchronizer error + expect(statusCallback).toHaveBeenNthCalledWith(4, DataSourceState.Closed, undefined); // stop composite source }); it('can be stopped and restarted', async () => { @@ -368,8 +577,8 @@ it('can be stopped and restarted', async () => { }; const underTest = new CompositeDataSource( - [makeInitializerFactory(mockInitializer1)], - [makeSynchronizerFactory(mockSynchronizer1)], + [makeDataSourceFactory(mockInitializer1)], + [makeDataSourceFactory(mockSynchronizer1)], undefined, makeTestTransitionConditions(), makeZeroBackoff(), @@ -425,17 +634,19 @@ it('is well behaved with no initializers and no synchronizers configured', async let statusCallback; await new Promise((resolve) => { - statusCallback = jest.fn((_1: DataSourceState, _2: any) => { - resolve(); + statusCallback = jest.fn((state: DataSourceState, _2: any) => { + if (state === DataSourceState.Closed) { + resolve(); + } }); underTest.start(jest.fn(), statusCallback); }); - expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Closed, { + expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Initializing, undefined); // initializer + expect(statusCallback).toHaveBeenNthCalledWith(2, DataSourceState.Closed, { name: 'ExhaustedDataSources', - message: - 'CompositeDataSource has exhausted all configured datasources (0 initializers, 0 synchronizers).', + message: 'CompositeDataSource has exhausted all configured initializers and synchronizers.', }); }); @@ -449,6 +660,8 @@ it('is well behaved with no initializer and synchronizer configured', async () = _dataCallback: (basis: boolean, data: any) => void, _statusCallback: (status: DataSourceState, err?: any) => void, ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Valid); _dataCallback(false, mockSynchronizer1Data); }, ), @@ -457,22 +670,25 @@ it('is well behaved with no initializer and synchronizer configured', async () = const underTest = new CompositeDataSource( [], - [makeSynchronizerFactory(mockSynchronizer1)], + [makeDataSourceFactory(mockSynchronizer1)], undefined, makeTestTransitionConditions(), makeZeroBackoff(), ); let dataCallback; + const statusCallback = jest.fn(); await new Promise((resolve) => { dataCallback = jest.fn(() => { resolve(); }); - underTest.start(dataCallback, jest.fn()); + underTest.start(dataCallback, statusCallback); }); expect(dataCallback).toHaveBeenNthCalledWith(1, false, { key: 'sync1' }); + expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Initializing, undefined); + expect(statusCallback).toHaveBeenNthCalledWith(2, DataSourceState.Valid, undefined); }); it('is well behaved with an initializer and no synchronizers configured', async () => { @@ -484,14 +700,17 @@ it('is well behaved with an initializer and no synchronizers configured', async _dataCallback: (basis: boolean, data: any) => void, _statusCallback: (status: DataSourceState, err?: any) => void, ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Valid); _dataCallback(true, { key: 'init1' }); + _statusCallback(DataSourceState.Closed); }, ), stop: jest.fn(), }; const underTest = new CompositeDataSource( - [makeInitializerFactory(mockInitializer1)], + [makeDataSourceFactory(mockInitializer1)], [], undefined, makeTestTransitionConditions(), @@ -502,18 +721,22 @@ it('is well behaved with an initializer and no synchronizers configured', async let statusCallback; await new Promise((resolve) => { dataCallback = jest.fn(); - statusCallback = jest.fn((_1: DataSourceState, _2: any) => { - resolve(); + statusCallback = jest.fn((state: DataSourceState, _2: any) => { + if (state === DataSourceState.Closed) { + resolve(); + } }); underTest.start(dataCallback, statusCallback); }); expect(dataCallback).toHaveBeenNthCalledWith(1, true, { key: 'init1' }); - expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Closed, { + expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Initializing, undefined); + expect(statusCallback).toHaveBeenNthCalledWith(2, DataSourceState.Valid, undefined); // initializer got data + expect(statusCallback).toHaveBeenNthCalledWith(3, DataSourceState.Interrupted, undefined); // initializer got data + expect(statusCallback).toHaveBeenNthCalledWith(4, DataSourceState.Closed, { name: 'ExhaustedDataSources', - message: - 'CompositeDataSource has exhausted all configured datasources (1 initializers, 0 synchronizers).', + message: 'CompositeDataSource has exhausted all configured initializers and synchronizers.', }); }); @@ -541,6 +764,7 @@ it('consumes cancellation tokens correctly', async () => { _statusCallback: (status: DataSourceState, err?: any) => void, ) => { _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Valid); _statusCallback(DataSourceState.Interrupted); // report interrupted to schedule automatic transition and create cancellation token }, ), @@ -548,11 +772,11 @@ it('consumes cancellation tokens correctly', async () => { }; const underTest = new CompositeDataSource( - [makeInitializerFactory(mockInitializer1)], - [makeSynchronizerFactory(mockSynchronizer1)], + [makeDataSourceFactory(mockInitializer1)], + [makeDataSourceFactory(mockSynchronizer1)], undefined, { - // pass in transition condition of 0 so that it will thrash, generating cancellation tokens repeatedly + // pass in transition condition so that it will thrash, generating cancellation tokens repeatedly [DataSourceState.Interrupted]: { durationMS: 100, transition: 'fallback', @@ -566,11 +790,13 @@ it('consumes cancellation tokens correctly', async () => { let interruptedCount = 0; await new Promise((resolve) => { dataCallback = jest.fn(); - statusCallback = jest.fn((_1: DataSourceState, _2: any) => { - interruptedCount += 1; - if (interruptedCount > 10) { - // let it thrash for N iterations - resolve(); + statusCallback = jest.fn((state: DataSourceState, _2: any) => { + if (state === DataSourceState.Interrupted) { + interruptedCount += 1; + if (interruptedCount > 10) { + // let it thrash for N iterations + resolve(); + } } }); diff --git a/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts b/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts index f7a5834761..3b61a088f3 100644 --- a/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts +++ b/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts @@ -1,12 +1,12 @@ // TODO: refactor client-sdk to use this enum export enum DataSourceState { - // Spinning up to make first connection attempt - Initializing, // Positive confirmation of connection/data receipt Valid, + // Spinning up to make first connection attempt + Initializing, // Transient issue, automatic retry is expected Interrupted, - // Permanent issue, external intervention required + // Data source was closed and will not retry automatically. Closed, } @@ -28,16 +28,4 @@ export interface DataSource { stop(): void; } -export type LDInitializerFactory = () => DataSystemInitializer; - -export type LDSynchronizerFactory = () => DataSystemSynchronizer; - -/** - * A data source that can be used to fetch the basis. - */ -export interface DataSystemInitializer extends DataSource {} - -/** - * A data source that can be used to fetch the basis or ongoing data changes. - */ -export interface DataSystemSynchronizer extends DataSource {} +export type LDDataSourceFactory = () => DataSource; diff --git a/packages/shared/common/src/api/subsystem/DataSystem/index.ts b/packages/shared/common/src/api/subsystem/DataSystem/index.ts index 3ad2737e2d..0b9e1141b8 100644 --- a/packages/shared/common/src/api/subsystem/DataSystem/index.ts +++ b/packages/shared/common/src/api/subsystem/DataSystem/index.ts @@ -1,8 +1 @@ -export { - DataSource, - DataSourceState, - DataSystemInitializer, - DataSystemSynchronizer, - LDInitializerFactory, - LDSynchronizerFactory, -} from './DataSource'; +export { DataSource, DataSourceState, LDDataSourceFactory } from './DataSource'; diff --git a/packages/shared/common/src/api/subsystem/index.ts b/packages/shared/common/src/api/subsystem/index.ts index 7dc2af969b..bace160f64 100644 --- a/packages/shared/common/src/api/subsystem/index.ts +++ b/packages/shared/common/src/api/subsystem/index.ts @@ -1,11 +1,4 @@ -import { - DataSource, - DataSourceState, - DataSystemInitializer, - DataSystemSynchronizer, - LDInitializerFactory, - LDSynchronizerFactory, -} from './DataSystem'; +import { DataSource, DataSourceState, LDDataSourceFactory } from './DataSystem'; import LDContextDeduplicator from './LDContextDeduplicator'; import LDEventProcessor from './LDEventProcessor'; import LDEventSender, { LDDeliveryStatus, LDEventSenderResult, LDEventType } from './LDEventSender'; @@ -14,10 +7,7 @@ import { LDStreamProcessor } from './LDStreamProcessor'; export { DataSource, DataSourceState, - DataSystemInitializer, - DataSystemSynchronizer, - LDInitializerFactory, - LDSynchronizerFactory, + LDDataSourceFactory, LDEventProcessor, LDContextDeduplicator, LDEventSender, diff --git a/packages/shared/common/src/datasource/CompositeDataSource.ts b/packages/shared/common/src/datasource/CompositeDataSource.ts index d831463124..e8eb4d0e42 100644 --- a/packages/shared/common/src/datasource/CompositeDataSource.ts +++ b/packages/shared/common/src/datasource/CompositeDataSource.ts @@ -4,10 +4,10 @@ import { CallbackHandler } from '../api/subsystem/DataSystem/CallbackHandler'; import { DataSource, DataSourceState, - LDInitializerFactory, - LDSynchronizerFactory, + LDDataSourceFactory, } from '../api/subsystem/DataSystem/DataSource'; import { Backoff, DefaultBackoff } from './Backoff'; +import { DataSourceList } from './dataSourceList'; const DEFAULT_FALLBACK_TIME_MS = 2 * 60 * 1000; const DEFAULT_RECOVERY_TIME_MS = 5 * 60 * 1000; @@ -15,7 +15,7 @@ const DEFAULT_RECOVERY_TIME_MS = 5 * 60 * 1000; /** * Represents a transition between data sources. */ -export type Transition = 'none' | 'switchToSync' | 'fallback' | 'recover' | 'stop'; +export type Transition = 'switchToSync' | 'fallback' | 'recover' | 'stop'; /** * Given a {@link DataSourceState}, how long to wait before transitioning. @@ -38,7 +38,8 @@ export class CompositeDataSource implements DataSource { // TODO: SDK-1044 utilize selector from initializers private _initPhaseActive: boolean; - private _currentPosition: number; + private _initFactories: DataSourceList; + private _syncFactories: DataSourceList; private _stopped: boolean = true; private _externalTransitionPromise: Promise; @@ -46,12 +47,12 @@ export class CompositeDataSource implements DataSource { private _cancelTokens: (() => void)[] = []; /** - * @param _initializers factories to create {@link DataSystemInitializer}s, in priority order. - * @param _synchronizers factories to create {@link DataSystemSynchronizer}s, in priority order. + * @param initializers factories to create {@link DataSystemInitializer}s, in priority order. + * @param synchronizers factories to create {@link DataSystemSynchronizer}s, in priority order. */ constructor( - private readonly _initializers: LDInitializerFactory[], - private readonly _synchronizers: LDSynchronizerFactory[], + initializers: LDDataSourceFactory[], + synchronizers: LDDataSourceFactory[], private readonly _logger?: LDLogger, private readonly _transitionConditions: TransitionConditions = { [DataSourceState.Valid]: { @@ -63,16 +64,14 @@ export class CompositeDataSource implements DataSource { transition: 'fallback', }, }, - private readonly _backoff: Backoff = new DefaultBackoff( - 1000, // TODO SDK-1137: handle blacklisting perpetually failing sources - 30000, - ), + private readonly _backoff: Backoff = new DefaultBackoff(1000, 30000), ) { this._externalTransitionPromise = new Promise((resolveTransition) => { this._externalTransitionResolve = resolveTransition; }); - this._initPhaseActive = _initializers.length > 0; // init phase if we have initializers - this._currentPosition = 0; + this._initPhaseActive = initializers.length > 0; // init phase if we have initializers + this._initFactories = new DataSourceList(false, initializers); + this._syncFactories = new DataSourceList(true, synchronizers); } async start( @@ -87,12 +86,23 @@ export class CompositeDataSource implements DataSource { this._stopped = false; this._logger?.debug( - `CompositeDataSource starting with (${this._initializers.length} initializers, ${this._synchronizers.length} synchronizers).`, + `CompositeDataSource starting with (${this._initFactories.length()} initializers, ${this._syncFactories.length()} synchronizers).`, ); - let lastTransition: Transition = 'none'; + + // this wrapper turns status updates from underlying data sources into a valid series of status updates for the consumer of this + // composite data source + const sanitizedStatusCallback = this._wrapStatusCallbackWithSanitizer(statusCallback); + sanitizedStatusCallback(DataSourceState.Initializing); + + let lastTransition: Transition | undefined; // eslint-disable-next-line no-constant-condition while (true) { - const currentDS: DataSource | undefined = this._pickDataSource(lastTransition); + const { + dataSource: currentDS, + isPrimary, + cullDSFactory, + } = this._pickDataSource(lastTransition); + const internalTransitionPromise = new Promise((transitionResolve) => { if (currentDS) { // these local variables are used for handling automatic transition related to data source status (ex: recovering to primary after @@ -109,6 +119,7 @@ export class CompositeDataSource implements DataSource { // transition to sync if we get basis during init callbackHandler.disable(); this._consumeCancelToken(cancelScheduledTransition); + sanitizedStatusCallback(DataSourceState.Interrupted); transitionResolve({ transition: 'switchToSync' }); } }, @@ -121,16 +132,21 @@ export class CompositeDataSource implements DataSource { ); if (err || state === DataSourceState.Closed) { callbackHandler.disable(); - statusCallback(DataSourceState.Interrupted, err); // underlying errors or closed states are masked as interrupted while we transition + if (err.recoverable === false) { + // don't use this datasource's factory again + cullDSFactory?.(); + } + sanitizedStatusCallback(state, err); this._consumeCancelToken(cancelScheduledTransition); transitionResolve({ transition: 'fallback', err }); // unrecoverable error has occurred, so fallback } else { - statusCallback(state, null); // report the status upward + sanitizedStatusCallback(state); if (state !== lastState) { lastState = state; this._consumeCancelToken(cancelScheduledTransition); // cancel previously scheduled status transition if one was scheduled - const excludeRecovery = this._currentPosition === 0; // primary source cannot recover to itself, so exclude it - const condition = this._lookupTransitionCondition(state, excludeRecovery); + + // primary source cannot recover to itself, so exclude it + const condition = this._lookupTransitionCondition(state, isPrimary); if (condition) { const { promise, cancel } = this._cancellableDelay(condition.durationMS); cancelScheduledTransition = cancel; @@ -138,6 +154,7 @@ export class CompositeDataSource implements DataSource { promise.then(() => { this._consumeCancelToken(cancel); callbackHandler.disable(); + sanitizedStatusCallback(DataSourceState.Interrupted); transitionResolve({ transition: condition.transition }); }); } else { @@ -157,7 +174,7 @@ export class CompositeDataSource implements DataSource { transition: 'stop', err: { name: 'ExhaustedDataSources', - message: `CompositeDataSource has exhausted all configured datasources (${this._initializers.length} initializers, ${this._synchronizers.length} synchronizers).`, + message: `CompositeDataSource has exhausted all configured initializers and synchronizers.`, }, }); } @@ -192,14 +209,12 @@ export class CompositeDataSource implements DataSource { this._consumeCancelToken(cancelDelay); } + lastTransition = transitionRequest.transition; if (transitionRequest.transition === 'stop') { - // exit the loop + // exit the loop, this is intentionally not the sanitized status callback statusCallback(DataSourceState.Closed, transitionRequest.err); - lastTransition = transitionRequest.transition; break; } - - lastTransition = transitionRequest.transition; } // reset so that run can be called again in the future @@ -214,52 +229,70 @@ export class CompositeDataSource implements DataSource { private _reset() { this._stopped = true; - this._initPhaseActive = this._initializers.length > 0; // init phase if we have initializers; - this._currentPosition = 0; + this._initPhaseActive = this._initFactories.length() > 0; // init phase if we have initializers; + this._initFactories.reset(); + this._syncFactories.reset(); this._externalTransitionPromise = new Promise((tr) => { this._externalTransitionResolve = tr; }); // intentionally not resetting the backoff to avoid a code path that could circumvent throttling } - private _pickDataSource(transition: Transition | undefined): DataSource | undefined { + /** + * Determines the next datasource and returns that datasource as well as a closure to cull the + * datasource from the datasource lists. One example where the cull closure is invoked is if the + * datasource has an unrecoverable error. + */ + private _pickDataSource(transition?: Transition): { + dataSource: DataSource | undefined; + isPrimary: boolean; + cullDSFactory: (() => void) | undefined; + } { + let factory: LDDataSourceFactory | undefined; + let isPrimary: boolean; switch (transition) { case 'switchToSync': this._initPhaseActive = false; // one way toggle to false, unless this class is reset() - this._currentPosition = 0; - break; - case 'fallback': - this._currentPosition += 1; + this._syncFactories.reset(); + isPrimary = this._syncFactories.pos() === 0; + factory = this._syncFactories.next(); break; case 'recover': - this._currentPosition = 0; + if (this._initPhaseActive) { + this._initFactories.reset(); + isPrimary = this._initFactories.pos() === 0; + factory = this._initFactories.next(); + } else { + this._syncFactories.reset(); + isPrimary = this._syncFactories.pos() === 0; + factory = this._syncFactories.next(); + } break; - case 'none': + case 'fallback': default: - // don't do anything in this case + if (this._initPhaseActive) { + isPrimary = this._initFactories.pos() === 0; + factory = this._initFactories.next(); + } else { + isPrimary = this._syncFactories.pos() === 0; + factory = this._syncFactories.next(); + } break; } - if (this._initPhaseActive) { - // We don't loop back through initializers, so if outside range of initializers, instead return undefined. - if (this._currentPosition > this._initializers.length - 1) { - return undefined; - } - - return this._initializers[this._currentPosition](); + if (!factory) { + return { dataSource: undefined, isPrimary, cullDSFactory: undefined }; } - // getting here indicates we are using a synchronizer - // if no synchronizers, return undefined - if (this._synchronizers.length <= 0) { - return undefined; - } - this._currentPosition %= this._synchronizers.length; // modulate position to loop back to start if necessary - if (this._currentPosition > this._synchronizers.length - 1) { - // this is only possible if no synchronizers were provided - return undefined; - } - return this._synchronizers[this._currentPosition](); + return { + dataSource: factory(), + isPrimary, + cullDSFactory: () => { + if (factory) { + this._syncFactories.remove(factory); + } + }, + }; } /** @@ -273,7 +306,7 @@ export class CompositeDataSource implements DataSource { const condition = this._transitionConditions[state]; // exclude recovery can happen for certain initializers/synchronizers (ex: the primary synchronizer shouldn't recover to itself) - if (!condition || (excludeRecover && condition.transition === 'recover')) { + if (excludeRecover && condition?.transition === 'recover') { return undefined; } @@ -303,4 +336,44 @@ export class CompositeDataSource implements DataSource { this._cancelTokens.splice(index, 1); } } + + /** + * This wrapper will ensure the following: + * + * Don't report DataSourceState.Initializing except as first status callback. + * Map underlying DataSourceState.Closed to interrupted. + * Don't report the same status and error twice in a row. + */ + private _wrapStatusCallbackWithSanitizer( + statusCallback: (status: DataSourceState, err?: any) => void, + ): (status: DataSourceState, err?: any) => void { + let alreadyReportedInitializing = false; + let lastStatus: DataSourceState | undefined; + let lastErr: any; + + return (status: DataSourceState, err?: any) => { + let sanitized = status; + // underlying errors, closed state, or off are masked as interrupted while we transition + if (status === DataSourceState.Closed) { + sanitized = DataSourceState.Interrupted; + } + + // don't report the same combination of values twice in a row + if (sanitized === lastStatus && err === lastErr) { + return; + } + + if (sanitized === DataSourceState.Initializing) { + // don't report initializing again if that has already been reported + if (alreadyReportedInitializing) { + return; + } + alreadyReportedInitializing = true; + } + + lastStatus = sanitized; + lastErr = err; + statusCallback(sanitized, err); + }; + } } diff --git a/packages/shared/common/src/datasource/dataSourceList.ts b/packages/shared/common/src/datasource/dataSourceList.ts new file mode 100644 index 0000000000..e5e740265e --- /dev/null +++ b/packages/shared/common/src/datasource/dataSourceList.ts @@ -0,0 +1,102 @@ +/** + * Helper class for {@link CompositeDataSource} to manage iterating on data sources and removing them on the fly. + */ +export class DataSourceList { + private _list: T[]; + private _circular: boolean; + private _pos: number; + + /** + * @param circular whether to loop off the end of the list back to the start + * @param initialList of content + */ + constructor(circular: boolean, initialList?: T[]) { + this._list = initialList ? [...initialList] : []; + this._circular = circular; + this._pos = 0; + } + + /** + * Returns the current head and then iterates. + */ + next(): T | undefined { + if (this._list.length <= 0 || this._pos >= this._list.length) { + return undefined; + } + + const result = this._list[this._pos]; + + if (this._circular) { + this._pos = (this._pos + 1) % this._list.length; + } else { + this._pos += 1; + } + + return result; + } + + /** + * Replaces all elements with the provided list and resets the position of head to the start. + * + * @param input that will replace existing list + */ + replace(input: T[]): void { + this._list = [...input]; + this._pos = 0; + } + + /** + * Removes the provided element from the list. If the removed element was the head, head moves to next. Consider head may be undefined if list is empty after removal. + * + * @param element to remove + * @returns true if element was removed + */ + remove(element: T): boolean { + const index = this._list.indexOf(element); + if (index < 0) { + return false; + } + + this._list.splice(index, 1); + if (this._list.length > 0) { + // if removed item was before head, adjust head + if (index < this._pos) { + this._pos -= 1; + } + + if (this._circular && this._pos > this._list.length - 1) { + this._pos = 0; + } + } + return true; + } + + /** + * Reset the head position to the start of the list. + */ + reset() { + this._pos = 0; + } + + /** + * @returns the current head position in the list, 0 indexed. + */ + pos() { + return this._pos; + } + + /** + * @returns the current length of the list + */ + length() { + return this._list.length; + } + + /** + * Clears the list and resets head. + */ + clear() { + this._list = []; + this._pos = 0; + } +} diff --git a/packages/shared/sdk-server/__tests__/data_sources/OneShotInitializerFDv2.test.ts b/packages/shared/sdk-server/__tests__/data_sources/OneShotInitializerFDv2.test.ts index a590d6f03a..d797a169b1 100644 --- a/packages/shared/sdk-server/__tests__/data_sources/OneShotInitializerFDv2.test.ts +++ b/packages/shared/sdk-server/__tests__/data_sources/OneShotInitializerFDv2.test.ts @@ -38,10 +38,7 @@ describe('given a one shot initializer', () => { beforeEach(() => { testLogger = new TestLogger(); - initializer = new OneShotInitializerFDv2( - requestor as unknown as Requestor, - testLogger, - ); + initializer = new OneShotInitializerFDv2(requestor as unknown as Requestor, testLogger); }); afterEach(() => { diff --git a/packages/shared/sdk-server/__tests__/data_sources/PollingProcessorFDv2.test.ts b/packages/shared/sdk-server/__tests__/data_sources/PollingProcessorFDv2.test.ts index e0d77e5fd2..3cadd04e28 100644 --- a/packages/shared/sdk-server/__tests__/data_sources/PollingProcessorFDv2.test.ts +++ b/packages/shared/sdk-server/__tests__/data_sources/PollingProcessorFDv2.test.ts @@ -59,10 +59,18 @@ describe('given an event processor', () => { expect(mockStatusCallback).toHaveBeenNthCalledWith(1, subsystem.DataSourceState.Initializing); }); - it('calls callback on success', () => { + it('calls callback on success', async () => { requestor.requestAllData = jest.fn((cb) => cb(undefined, jsonData)); - processor.start(mockDataCallback, mockStatusCallback); - expect(mockDataCallback).toHaveBeenNthCalledWith(1, true, { + let dataCallback; + await new Promise((resolve) => { + dataCallback = jest.fn(() => { + resolve(); + }); + + processor.start(dataCallback, mockStatusCallback); + }); + + expect(dataCallback).toHaveBeenNthCalledWith(1, true, { basis: true, id: `mockId`, state: `mockState`, @@ -179,10 +187,7 @@ describe('given a polling processor with a short poll duration', () => { expect(mockStatusCallback).toHaveBeenNthCalledWith( 2, subsystem.DataSourceState.Interrupted, - new LDPollingError( - DataSourceErrorKind.ErrorResponse, - `Malformed JSON data in polling response`, - ), + new LDPollingError(DataSourceErrorKind.ErrorResponse, `Malformed data in polling response`), ); setTimeout(() => { @@ -208,13 +213,14 @@ describe('given a polling processor with a short poll duration', () => { expect(mockStatusCallback).toHaveBeenNthCalledWith(1, subsystem.DataSourceState.Initializing); expect(mockStatusCallback).toHaveBeenNthCalledWith( 2, - subsystem.DataSourceState.Interrupted, + subsystem.DataSourceState.Closed, new LDPollingError( DataSourceErrorKind.ErrorResponse, status === 401 ? `Received error ${status} (invalid SDK key) for polling request - giving up permanently` : `Received error ${status} for polling request - giving up permanently`, status as number, + false, ), ); setTimeout(() => { diff --git a/packages/shared/sdk-server/__tests__/data_sources/StreamingProcessorFDv2.test.ts b/packages/shared/sdk-server/__tests__/data_sources/StreamingProcessorFDv2.test.ts index 4f864bc51b..39aa6a02a1 100644 --- a/packages/shared/sdk-server/__tests__/data_sources/StreamingProcessorFDv2.test.ts +++ b/packages/shared/sdk-server/__tests__/data_sources/StreamingProcessorFDv2.test.ts @@ -312,7 +312,12 @@ describe('given a stream processor with mock event source', () => { expect(mockStatusCallback).toHaveBeenNthCalledWith( 2, subsystem.DataSourceState.Closed, - new LDStreamingError(DataSourceErrorKind.Unknown, testError.message, testError.status), + new LDStreamingError( + DataSourceErrorKind.Unknown, + testError.message, + testError.status, + true, + ), ); expect(logger.error).toBeCalledWith( expect.stringMatching(new RegExp(`${status}.*permanently`)), diff --git a/packages/shared/sdk-server/src/LDClientImpl.ts b/packages/shared/sdk-server/src/LDClientImpl.ts index e8afbf53f4..25eb64c0f2 100644 --- a/packages/shared/sdk-server/src/LDClientImpl.ts +++ b/packages/shared/sdk-server/src/LDClientImpl.ts @@ -238,7 +238,7 @@ export default class LDClientImpl implements LDClient { this._updateProcessor?.start(); } else { // make the FDv2 composite datasource with initializers/synchronizers - const initializers: subsystem.LDSynchronizerFactory[] = []; + const initializers: subsystem.LDDataSourceFactory[] = []; // use one shot initializer for performance and cost initializers.push( @@ -249,7 +249,7 @@ export default class LDClientImpl implements LDClient { ), ); - const synchronizers: subsystem.LDSynchronizerFactory[] = []; + const synchronizers: subsystem.LDDataSourceFactory[] = []; // if streaming is configured, add streaming synchronizer if ( isStandardOptions(config.dataSystem.dataSource) || diff --git a/packages/shared/sdk-server/src/data_sources/OneShotInitializerFDv2.ts b/packages/shared/sdk-server/src/data_sources/OneShotInitializerFDv2.ts index 8049511e8d..d511361228 100644 --- a/packages/shared/sdk-server/src/data_sources/OneShotInitializerFDv2.ts +++ b/packages/shared/sdk-server/src/data_sources/OneShotInitializerFDv2.ts @@ -15,7 +15,7 @@ import Requestor from './Requestor'; /** * @internal */ -export default class OneShotInitializerFDv2 implements subsystemCommon.DataSystemInitializer { +export default class OneShotInitializerFDv2 implements subsystemCommon.DataSource { constructor( private readonly _requestor: Requestor, private readonly _logger?: LDLogger, @@ -41,7 +41,6 @@ export default class OneShotInitializerFDv2 implements subsystemCommon.DataSyste } if (!body) { - this._logger?.error('One shot initializer response missing body.'); statusCallback( subsystemCommon.DataSourceState.Closed, new LDPollingError( @@ -74,24 +73,22 @@ export default class OneShotInitializerFDv2 implements subsystemCommon.DataSyste this._logger, ); + statusCallback(subsystemCommon.DataSourceState.Valid); + payloadProcessor.addPayloadListener((payload) => { dataCallback(payload.basis, payload); }); payloadProcessor.processEvents(parsed.events); - // TODO: SDK-855 implement blocking duplicate data source state events in DataAvailability API - statusCallback(subsystemCommon.DataSourceState.Valid); - } catch { + statusCallback(subsystemCommon.DataSourceState.Closed); + } catch (error: any) { // We could not parse this JSON. Report the problem. - this._logger?.error('Initialization response contained invalid data'); - this._logger?.debug(`Malformed JSON follows: ${body}`); + this._logger?.error('Response contained invalid data'); + this._logger?.debug(`${err} - Body follows: ${body}`); statusCallback( subsystemCommon.DataSourceState.Closed, - new LDPollingError( - DataSourceErrorKind.InvalidData, - 'Malformed JSON data in polling response', - ), + new LDPollingError(DataSourceErrorKind.InvalidData, 'Malformed data in polling response'), ); } }); diff --git a/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts b/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts index 2f155cc288..dc0f9da02f 100644 --- a/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts +++ b/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts @@ -18,7 +18,7 @@ export type PollingErrorHandler = (err: LDPollingError) => void; /** * @internal */ -export default class PollingProcessorFDv2 implements subsystemCommon.DataSystemSynchronizer { +export default class PollingProcessorFDv2 implements subsystemCommon.DataSource { private _stopped = false; private _timeoutHandle: any; @@ -51,8 +51,8 @@ export default class PollingProcessorFDv2 implements subsystemCommon.DataSystemS const message = httpErrorMessage(err, 'polling request'); this._logger?.error(message); statusCallback( - subsystemCommon.DataSourceState.Interrupted, - new LDPollingError(DataSourceErrorKind.ErrorResponse, message, status), + subsystemCommon.DataSourceState.Closed, + new LDPollingError(DataSourceErrorKind.ErrorResponse, message, status, false), ); // It is not recoverable, return and do not trigger another poll. return; @@ -72,7 +72,13 @@ export default class PollingProcessorFDv2 implements subsystemCommon.DataSystemS } if (!body) { - this._logger?.warn('Response missing body, will retry.'); + statusCallback( + subsystemCommon.DataSourceState.Interrupted, + new LDPollingError( + DataSourceErrorKind.ErrorResponse, + 'Response missing body, will retry.', + ), + ); // schedule poll this._timeoutHandle = setTimeout(() => { this._poll(dataCallback, statusCallback); @@ -113,14 +119,11 @@ export default class PollingProcessorFDv2 implements subsystemCommon.DataSystemS } catch { // We could not parse this JSON. Report the problem and fallthrough to // start another poll. - this._logger?.error('Polling received malformed data'); - this._logger?.debug(`Malformed JSON follows: ${body}`); + this._logger?.error('Response contained invalid data'); + this._logger?.debug(`${err} - Body follows: ${body}`); statusCallback( subsystemCommon.DataSourceState.Interrupted, - new LDPollingError( - DataSourceErrorKind.InvalidData, - 'Malformed JSON data in polling response', - ), + new LDPollingError(DataSourceErrorKind.InvalidData, 'Malformed data in polling response'), ); } diff --git a/packages/shared/sdk-server/src/data_sources/StreamingProcessorFDv2.ts b/packages/shared/sdk-server/src/data_sources/StreamingProcessorFDv2.ts index 7378f79562..87c609acd6 100644 --- a/packages/shared/sdk-server/src/data_sources/StreamingProcessorFDv2.ts +++ b/packages/shared/sdk-server/src/data_sources/StreamingProcessorFDv2.ts @@ -18,7 +18,7 @@ import { Flag } from '../evaluation/data/Flag'; import { Segment } from '../evaluation/data/Segment'; import { processFlag, processSegment } from '../store/serialization'; -export default class StreamingProcessorFDv2 implements subsystemCommon.DataSystemSynchronizer { +export default class StreamingProcessorFDv2 implements subsystemCommon.DataSource { private readonly _headers: { [key: string]: string | string[] }; private readonly _streamUri: string; private readonly _logger?: LDLogger; @@ -83,7 +83,7 @@ export default class StreamingProcessorFDv2 implements subsystemCommon.DataSyste this._logConnectionResult(false); statusCallback( subsystemCommon.DataSourceState.Closed, - new LDStreamingError(DataSourceErrorKind.ErrorResponse, err.message, err.status), + new LDStreamingError(DataSourceErrorKind.ErrorResponse, err.message, err.status, false), ); return false; }