Skip to content

Commit dbb432c

Browse files
Todd AndersonTodd Anderson
authored andcommitted
removing DataSourceState.Off
1 parent 199e574 commit dbb432c

File tree

8 files changed

+115
-105
lines changed

8 files changed

+115
-105
lines changed

packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts

Lines changed: 94 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -230,9 +230,10 @@ it('removes synchronizer that reports unrecoverable error and loops on remaining
230230
_statusCallback: (status: DataSourceState, err?: any) => void,
231231
) => {
232232
_statusCallback(DataSourceState.Initializing);
233-
_statusCallback(DataSourceState.Off, {
233+
_statusCallback(DataSourceState.Closed, {
234234
name: 'Error',
235-
message: 'I am an unrecoverable error!', // error will lead to culling
235+
message: 'I am an unrecoverable error!', // error will lead to culling,
236+
recoverable: false,
236237
});
237238
},
238239
),
@@ -380,93 +381,99 @@ it('reports error when all initializers fail', async () => {
380381
);
381382
expect(statusCallback).toHaveBeenNthCalledWith(4, DataSourceState.Closed, {
382383
name: 'ExhaustedDataSources',
383-
message:
384-
'CompositeDataSource has exhausted all configured datasources (2 initializers, 0 synchronizers).',
384+
message: 'CompositeDataSource has exhausted all configured initializers and synchronizers.',
385385
});
386386
expect(statusCallback).toHaveBeenCalledTimes(4);
387387
});
388388

389-
// it('it reports DataSourceState Off when all synchronizers report Off', async () => {
390-
// const mockInitializer1: DataSource = {
391-
// start: jest
392-
// .fn()
393-
// .mockImplementation(
394-
// (
395-
// _dataCallback: (basis: boolean, data: any) => void,
396-
// _statusCallback: (status: DataSourceState, err?: any) => void,
397-
// ) => {
398-
// _statusCallback(DataSourceState.Initializing);
399-
// _statusCallback(DataSourceState.Valid);
400-
// _dataCallback(true, { key: 'init1' });
401-
// _statusCallback(DataSourceState.Closed);
402-
// },
403-
// ),
404-
// stop: jest.fn(),
405-
// };
406-
407-
// const mockSynchronizer1 = {
408-
// start: jest
409-
// .fn()
410-
// .mockImplementation(
411-
// (
412-
// _dataCallback: (basis: boolean, data: any) => void,
413-
// _statusCallback: (status: DataSourceState, err?: any) => void,
414-
// ) => {
415-
// _statusCallback(DataSourceState.Initializing);
416-
// _statusCallback(DataSourceState.Off, {
417-
// name: 'Error1',
418-
// message: 'I am an unrecoverable error!',
419-
// });
420-
// },
421-
// ),
422-
// stop: jest.fn(),
423-
// };
424-
425-
// const mockSynchronizer2 = {
426-
// start: jest
427-
// .fn()
428-
// .mockImplementation(
429-
// (
430-
// _dataCallback: (basis: boolean, data: any) => void,
431-
// _statusCallback: (status: DataSourceState, err?: any) => void,
432-
// ) => {
433-
// _statusCallback(DataSourceState.Initializing);
434-
// _statusCallback(DataSourceState.Off, {
435-
// name: 'Error2',
436-
// message: 'I am an unrecoverable error!',
437-
// });
438-
// },
439-
// ),
440-
// stop: jest.fn(),
441-
// };
442-
443-
// const underTest = new CompositeDataSource(
444-
// [makeDataSourceFactory(mockInitializer1)],
445-
// [makeDataSourceFactory(mockSynchronizer1), makeDataSourceFactory(mockSynchronizer2)],
446-
// undefined,
447-
// makeTestTransitionConditions(),
448-
// makeZeroBackoff(),
449-
// );
450-
451-
// let statusCallback;
452-
// await new Promise<void>((resolve) => {
453-
// statusCallback = jest.fn((state: DataSourceState, err: any) => {
454-
// if (err && err.name === 'Error2') {
455-
// resolve();
456-
// }
457-
// });
458-
459-
// underTest.start(jest.fn(), statusCallback);
460-
// });
461-
462-
// expect(mockInitializer1.start).toHaveBeenCalledTimes(1);
463-
// expect(mockSynchronizer1.start).toHaveBeenCalledTimes(1);
464-
// expect(mockSynchronizer2.start).toHaveBeenCalledTimes(1);
465-
// expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Initializing, null);
466-
// expect(statusCallback).toHaveBeenNthCalledWith(2, DataSourceState.Valid, null);
467-
// expect(statusCallback).toHaveBeenNthCalledWith(3, DataSourceState.Interrupted, expect.anything());
468-
// expect(statusCallback).toHaveBeenNthCalledWith(4, DataSourceState.Off, null);
469-
// });
389+
it('it reports DataSourceState Closed when all synchronizers report Closed with unrecoverable errors', async () => {
390+
const mockInitializer1: DataSource = {
391+
start: jest
392+
.fn()
393+
.mockImplementation(
394+
(
395+
_dataCallback: (basis: boolean, data: any) => void,
396+
_statusCallback: (status: DataSourceState, err?: any) => void,
397+
) => {
398+
_statusCallback(DataSourceState.Initializing);
399+
_statusCallback(DataSourceState.Valid);
400+
_dataCallback(true, { key: 'init1' });
401+
_statusCallback(DataSourceState.Closed);
402+
},
403+
),
404+
stop: jest.fn(),
405+
};
406+
407+
const mockSynchronizer1 = {
408+
start: jest
409+
.fn()
410+
.mockImplementation(
411+
(
412+
_dataCallback: (basis: boolean, data: any) => void,
413+
_statusCallback: (status: DataSourceState, err?: any) => void,
414+
) => {
415+
_statusCallback(DataSourceState.Initializing);
416+
_statusCallback(DataSourceState.Closed, {
417+
name: 'Error1',
418+
message: 'I am an unrecoverable error!',
419+
recoverable: false,
420+
});
421+
},
422+
),
423+
stop: jest.fn(),
424+
};
425+
426+
const mockSynchronizer2 = {
427+
start: jest
428+
.fn()
429+
.mockImplementation(
430+
(
431+
_dataCallback: (basis: boolean, data: any) => void,
432+
_statusCallback: (status: DataSourceState, err?: any) => void,
433+
) => {
434+
_statusCallback(DataSourceState.Initializing);
435+
_statusCallback(DataSourceState.Closed, {
436+
name: 'Error2',
437+
message: 'I am an unrecoverable error!',
438+
recoverable: false,
439+
});
440+
},
441+
),
442+
stop: jest.fn(),
443+
};
444+
445+
const underTest = new CompositeDataSource(
446+
[makeDataSourceFactory(mockInitializer1)],
447+
[makeDataSourceFactory(mockSynchronizer1), makeDataSourceFactory(mockSynchronizer2)],
448+
undefined,
449+
makeTestTransitionConditions(),
450+
makeZeroBackoff(),
451+
);
452+
453+
let statusCallback;
454+
await new Promise<void>((resolve) => {
455+
statusCallback = jest.fn((state: DataSourceState, _: any) => {
456+
if (state === DataSourceState.Closed) {
457+
resolve();
458+
}
459+
});
460+
461+
underTest.start(jest.fn(), statusCallback);
462+
});
463+
464+
expect(mockInitializer1.start).toHaveBeenCalledTimes(1);
465+
expect(mockSynchronizer1.start).toHaveBeenCalledTimes(1);
466+
expect(mockSynchronizer2.start).toHaveBeenCalledTimes(1);
467+
expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Initializing, undefined);
468+
expect(statusCallback).toHaveBeenNthCalledWith(2, DataSourceState.Valid, undefined);
469+
expect(statusCallback).toHaveBeenNthCalledWith(3, DataSourceState.Interrupted, undefined); // initializer closes properly
470+
expect(statusCallback).toHaveBeenNthCalledWith(4, DataSourceState.Interrupted, expect.anything()); // sync1 closed with unrecoverable error
471+
expect(statusCallback).toHaveBeenNthCalledWith(5, DataSourceState.Interrupted, expect.anything()); // sync2 closed with unrecoverable error
472+
expect(statusCallback).toHaveBeenNthCalledWith(6, DataSourceState.Closed, {
473+
name: 'ExhaustedDataSources',
474+
message: `CompositeDataSource has exhausted all configured initializers and synchronizers.`,
475+
});
476+
});
470477

471478
it('can be stopped when in thrashing synchronizer fallback loop', async () => {
472479
const mockInitializer1 = {
@@ -639,8 +646,7 @@ it('is well behaved with no initializers and no synchronizers configured', async
639646
expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Initializing, undefined); // initializer
640647
expect(statusCallback).toHaveBeenNthCalledWith(2, DataSourceState.Closed, {
641648
name: 'ExhaustedDataSources',
642-
message:
643-
'CompositeDataSource has exhausted all configured datasources (0 initializers, 0 synchronizers).',
649+
message: 'CompositeDataSource has exhausted all configured initializers and synchronizers.',
644650
});
645651
});
646652

@@ -730,8 +736,7 @@ it('is well behaved with an initializer and no synchronizers configured', async
730736
expect(statusCallback).toHaveBeenNthCalledWith(3, DataSourceState.Interrupted, undefined); // initializer got data
731737
expect(statusCallback).toHaveBeenNthCalledWith(4, DataSourceState.Closed, {
732738
name: 'ExhaustedDataSources',
733-
message:
734-
'CompositeDataSource has exhausted all configured datasources (1 initializers, 0 synchronizers).',
739+
message: 'CompositeDataSource has exhausted all configured initializers and synchronizers.',
735740
});
736741
});
737742

packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,8 @@ export enum DataSourceState {
66
Initializing,
77
// Transient issue, automatic retry is expected
88
Interrupted,
9-
// Data source was closed and will not retry
9+
// Data source was closed and will not retry automatically.
1010
Closed,
11-
// This datasource encountered an unrecoverable error and it is not expected to be resolved through trying again in the future
12-
Off,
1311
}
1412

1513
export interface DataSource {

packages/shared/common/src/datasource/CompositeDataSource.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ export class CompositeDataSource implements DataSource {
102102
isPrimary,
103103
cullDSFactory,
104104
} = this._pickDataSource(lastTransition);
105+
105106
const internalTransitionPromise = new Promise<TransitionRequest>((transitionResolve) => {
106107
if (currentDS) {
107108
// these local variables are used for handling automatic transition related to data source status (ex: recovering to primary after
@@ -129,9 +130,9 @@ export class CompositeDataSource implements DataSource {
129130
this._logger?.debug(
130131
`CompositeDataSource received state ${state} from underlying data source.`,
131132
);
132-
if (err || state === DataSourceState.Closed || state === DataSourceState.Off) {
133+
if (err || state === DataSourceState.Closed) {
133134
callbackHandler.disable();
134-
if (state === DataSourceState.Off) {
135+
if (err.recoverable === false) {
135136
// don't use this datasource's factory again
136137
cullDSFactory?.();
137138
}
@@ -173,7 +174,7 @@ export class CompositeDataSource implements DataSource {
173174
transition: 'stop',
174175
err: {
175176
name: 'ExhaustedDataSources',
176-
message: `CompositeDataSource has exhausted all configured datasources (${this._initFactories.length()} initializers, ${this._syncFactories.length()} synchronizers).`,
177+
message: `CompositeDataSource has exhausted all configured initializers and synchronizers.`,
177178
},
178179
});
179180
}
@@ -340,7 +341,7 @@ export class CompositeDataSource implements DataSource {
340341
* This wrapper will ensure the following:
341342
*
342343
* Don't report DataSourceState.Initializing except as first status callback.
343-
* Map underlying DataSourceState.Closed and DataSourceState.Off to interrupted.
344+
* Map underlying DataSourceState.Closed to interrupted.
344345
* Don't report the same status and error twice in a row.
345346
*/
346347
private _wrapStatusCallbackWithSanitizer(
@@ -353,7 +354,7 @@ export class CompositeDataSource implements DataSource {
353354
return (status: DataSourceState, err?: any) => {
354355
let sanitized = status;
355356
// underlying errors, closed state, or off are masked as interrupted while we transition
356-
if (status === DataSourceState.Closed || status === DataSourceState.Off) {
357+
if (status === DataSourceState.Closed) {
357358
sanitized = DataSourceState.Interrupted;
358359
}
359360

packages/shared/sdk-server/__tests__/data_sources/PollingProcessorFDv2.test.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,13 +213,14 @@ describe('given a polling processor with a short poll duration', () => {
213213
expect(mockStatusCallback).toHaveBeenNthCalledWith(1, subsystem.DataSourceState.Initializing);
214214
expect(mockStatusCallback).toHaveBeenNthCalledWith(
215215
2,
216-
subsystem.DataSourceState.Off,
216+
subsystem.DataSourceState.Closed,
217217
new LDPollingError(
218218
DataSourceErrorKind.ErrorResponse,
219219
status === 401
220220
? `Received error ${status} (invalid SDK key) for polling request - giving up permanently`
221221
: `Received error ${status} for polling request - giving up permanently`,
222222
status as number,
223+
false,
223224
),
224225
);
225226
setTimeout(() => {

packages/shared/sdk-server/__tests__/data_sources/StreamingProcessorFDv2.test.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -311,8 +311,13 @@ describe('given a stream processor with mock event source', () => {
311311
expect(willRetry).toBeFalsy();
312312
expect(mockStatusCallback).toHaveBeenNthCalledWith(
313313
2,
314-
subsystem.DataSourceState.Off,
315-
new LDStreamingError(DataSourceErrorKind.Unknown, testError.message, testError.status),
314+
subsystem.DataSourceState.Closed,
315+
new LDStreamingError(
316+
DataSourceErrorKind.Unknown,
317+
testError.message,
318+
testError.status,
319+
true,
320+
),
316321
);
317322
expect(logger.error).toBeCalledWith(
318323
expect.stringMatching(new RegExp(`${status}.*permanently`)),

packages/shared/sdk-server/src/data_sources/OneShotInitializerFDv2.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ export default class OneShotInitializerFDv2 implements subsystemCommon.DataSourc
3434
const message = httpErrorMessage(err, 'initializer', 'initializer does not retry');
3535
this._logger?.error(message);
3636
statusCallback(
37-
subsystemCommon.DataSourceState.Off,
37+
subsystemCommon.DataSourceState.Closed,
3838
new LDPollingError(DataSourceErrorKind.ErrorResponse, message, status),
3939
);
4040
return;

packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ export default class PollingProcessorFDv2 implements subsystemCommon.DataSource
5151
const message = httpErrorMessage(err, 'polling request');
5252
this._logger?.error(message);
5353
statusCallback(
54-
subsystemCommon.DataSourceState.Off,
55-
new LDPollingError(DataSourceErrorKind.ErrorResponse, message, status),
54+
subsystemCommon.DataSourceState.Closed,
55+
new LDPollingError(DataSourceErrorKind.ErrorResponse, message, status, false),
5656
);
5757
// It is not recoverable, return and do not trigger another poll.
5858
return;

packages/shared/sdk-server/src/data_sources/StreamingProcessorFDv2.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,8 @@ export default class StreamingProcessorFDv2 implements subsystemCommon.DataSourc
8282
this._logger?.error(httpErrorMessage(err, 'streaming request'));
8383
this._logConnectionResult(false);
8484
statusCallback(
85-
subsystemCommon.DataSourceState.Off,
86-
new LDStreamingError(DataSourceErrorKind.ErrorResponse, err.message, err.status),
85+
subsystemCommon.DataSourceState.Closed,
86+
new LDStreamingError(DataSourceErrorKind.ErrorResponse, err.message, err.status, false),
8787
);
8888
return false;
8989
}

0 commit comments

Comments
 (0)