Skip to content

Commit 99931f0

Browse files
authored
feat: add FDv2 FileDataSource initializer (#1010)
This PR will introduce file data source initializer for FDv2 that could be specified by adding in an optional `intializerOption` to the FDv2 datasource option. If this option is not specified then the behavior should not change from before this change. Additionally: - I've added some more type definitions to align better with FDv2 as well as a change set builder to convert FDv1 scheme to FDv2. - added a new `custom` data source options mode that allows users to specify the ordering of their initializers and synchronizers NOTE: will need to look into a problem where while the initializers are running in expected order, the client initialized event seems to be prematurely firing. Example with the following config: ``` const ldClient = init(sdkKey, { dataSystem: { dataSource: { dataSourceOptionsType: 'custom', initializers: [ { type: 'file', paths: [path.resolve(__dirname, 'flagdatav1.json')] }, { type: 'polling' }, ], synchronizers: [ { type: 'streaming', streamInitialReconnectDelay: 1 } ], } } }); ``` STDOUT with the `sample-feature` flag set to `false` in LD: ``` fs start fs stop polling started *** SDK successfully initialized! *** The 'sample-feature' feature flag evaluates to true. ██ ██ ████████ ███████ ██ LAUNCHDARKLY █ ███████ ████████ ██ ██ *** Event triggered: update:sample-feature polling stopped *** The 'sample-feature' feature flag evaluates to false. info: [LaunchDarkly] Opened LaunchDarkly stream connection ``` > In the above run, we have the correct ordering where the fs initializer got data, but since it does not report a valid basis, the polling initializer is also being ran. But as you can see, the client reported initialized before the polling initializer was able to update the data to the real thing (but further down we can see that the flag value is being set correctly before the streaming synchronizer runs) I think this issue could be addressed in a separate change. <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Introduces a file-based FDv2 initializer and configurable custom data source pipeline, adds an adaptor to convert FDv1 payloads to FDv2 events, tightens FDv2 proto/types, and wires everything into LDClient with tests. > > - **FDv2 data flow**: > - **New `FDv1PayloadAdaptor`** (`packages/shared/common/src/internal/fdv2/FDv1PayloadAdaptor.ts`): converts FDv1 flags/segments into FDv2 event sequences; exported via `internal/fdv2/index`. > - **`PollingProcessorFDv2`**: replaces inline FDv1 conversion with `FDv1PayloadAdaptor` and sets selector; unchanged FDv2 path. > - **`payloadProcessor`**: relaxes `payload-transferred` validation to allow empty `state` and makes checks null/undefined-safe. > - **`proto`**: adds strict `EventType`, `ObjectKind`, `GoodbyeObject`, `ErrorObject`; refines event `data` union; makes `PayloadTransferred.id` optional. > - **SDK server – data source composition**: > - **Custom mode** in `LDDataSystemOptions`: new `custom` option with ordered `initializers` (`file`|`polling`) and `synchronizers` (`streaming`|`polling`). > - **`LDClientImpl`**: builds `CompositeDataSource` from custom configs; supports file initializer, streaming/polling synchronizers, and FDv1 fallback polling; exports `DEFAULT_STREAM_RECONNECT_DELAY`. > - **`Configuration`**: validates `custom` data source options; exports `DEFAULT_STREAM_RECONNECT_DELAY`. > - **New `FileDataInitializerFDv2`**: loads JSON/YAML from disk, merges multiple files, emits FDv2 payload via `FDv1PayloadAdaptor`; no auto-update; error handling for invalid/missing FS. > - **Tests**: > - Add unit tests for `FDv1PayloadAdaptor`. > - Extend `CompositeDataSource` tests for multi-initializer fallback/short-circuit. > - Add comprehensive tests for `FileDataInitializerFDv2` (JSON/YAML parsing, multi-file merge/overwrite, errors). > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit c42e4f4. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent d03976f commit 99931f0

File tree

12 files changed

+1118
-131
lines changed

12 files changed

+1118
-131
lines changed
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
import { fdv1PayloadAdaptor as FDv1PayloadAdaptor } from '../../../src/internal/fdv2/FDv1PayloadAdaptor';
2+
import { PayloadProcessor } from '../../../src/internal/fdv2/payloadProcessor';
3+
import { Event, PutObject } from '../../../src/internal/fdv2/proto';
4+
5+
// Mock PayloadProcessor that captures events
6+
class MockPayloadProcessor extends PayloadProcessor {
7+
public processedEvents: Event[] = [];
8+
9+
constructor() {
10+
super({}, undefined, undefined);
11+
}
12+
13+
override processEvents(events: Event[]) {
14+
this.processedEvents = [...this.processedEvents, ...events];
15+
// Don't call super.processEvents to avoid side effects in tests
16+
}
17+
}
18+
19+
it('includes server-intent as the first event and payload-transferred as the last eventwith correct structure', () => {
20+
const processor = new MockPayloadProcessor();
21+
const adaptor = FDv1PayloadAdaptor(processor);
22+
adaptor.processFullTransfer({ flags: {}, segments: {} });
23+
24+
const serverIntentEvent = processor.processedEvents[0] as Event;
25+
expect(serverIntentEvent.event).toBe('server-intent');
26+
expect(serverIntentEvent.data).toBeDefined();
27+
28+
const intentData = serverIntentEvent.data as any;
29+
expect(intentData.payloads).toBeDefined();
30+
expect(intentData.payloads.length).toBe(1);
31+
expect(intentData.payloads[0].intentCode).toBe('xfer-full');
32+
expect(intentData.payloads[0].id).toBe('FDv1Fallback');
33+
expect(intentData.payloads[0].target).toBe(1);
34+
expect(intentData.payloads[0].reason).toBe('payload-missing');
35+
36+
const payloadTransferredEvent = processor.processedEvents[
37+
processor.processedEvents.length - 1
38+
] as Event;
39+
expect(payloadTransferredEvent.event).toBe('payload-transferred');
40+
expect(payloadTransferredEvent.data).toBeDefined();
41+
42+
const transferredData = payloadTransferredEvent.data as any;
43+
expect(transferredData.state).toBe('');
44+
expect(transferredData.version).toBe(1);
45+
expect(transferredData.id).toBe('FDv1Fallback');
46+
});
47+
48+
it('pushFdv1Payload adds put-object events for flags and segments', () => {
49+
const processor = new MockPayloadProcessor();
50+
const adaptor = FDv1PayloadAdaptor(processor);
51+
const fdv1Payload = {
52+
flags: {
53+
'flag-1': { key: 'flag-1', version: 1, on: true },
54+
'flag-2': { key: 'flag-2', version: 2, on: false },
55+
},
56+
segments: {
57+
'segment-1': { key: 'segment-1', version: 1 },
58+
},
59+
};
60+
61+
adaptor.processFullTransfer(fdv1Payload);
62+
63+
const putObjectEvents = processor.processedEvents.filter((e) => e.event === 'put-object');
64+
expect(putObjectEvents.length).toBe(3);
65+
66+
const flag1Event = putObjectEvents.find((e) => (e.data as PutObject).key === 'flag-1');
67+
expect(flag1Event).toBeDefined();
68+
expect((flag1Event!.data as PutObject).kind).toBe('flag');
69+
expect((flag1Event!.data as PutObject).version).toBe(1);
70+
71+
const flag2Event = putObjectEvents.find((e) => (e.data as PutObject).key === 'flag-2');
72+
expect(flag2Event).toBeDefined();
73+
expect((flag2Event!.data as PutObject).kind).toBe('flag');
74+
expect((flag2Event!.data as PutObject).version).toBe(2);
75+
76+
const segment1Event = putObjectEvents.find((e) => (e.data as PutObject).key === 'segment-1');
77+
expect(segment1Event).toBeDefined();
78+
expect((segment1Event!.data as PutObject).kind).toBe('segment');
79+
expect((segment1Event!.data as PutObject).version).toBe(1);
80+
});

packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,3 +1004,161 @@ it('consumes cancellation tokens correctly', async () => {
10041004
// eslint-disable-next-line no-underscore-dangle
10051005
expect(underTest._cancelTokens.length).toEqual(0);
10061006
});
1007+
1008+
it('handles multiple initializers with fallback when first initializer fails and second succeeds', async () => {
1009+
const mockInitializer1Error = {
1010+
name: 'Error',
1011+
message: 'First initializer failed',
1012+
};
1013+
const mockInitializer1: DataSource = {
1014+
start: jest
1015+
.fn()
1016+
.mockImplementation(
1017+
(
1018+
_dataCallback: (basis: boolean, data: any) => void,
1019+
_statusCallback: (status: DataSourceState, err?: any) => void,
1020+
) => {
1021+
_statusCallback(DataSourceState.Initializing);
1022+
_statusCallback(DataSourceState.Closed, mockInitializer1Error);
1023+
},
1024+
),
1025+
stop: jest.fn(),
1026+
};
1027+
1028+
const mockInitializer2Data = { key: 'init2' };
1029+
const mockInitializer2: DataSource = {
1030+
start: jest
1031+
.fn()
1032+
.mockImplementation(
1033+
(
1034+
_dataCallback: (basis: boolean, data: any) => void,
1035+
_statusCallback: (status: DataSourceState, err?: any) => void,
1036+
) => {
1037+
_statusCallback(DataSourceState.Initializing);
1038+
_statusCallback(DataSourceState.Valid);
1039+
_dataCallback(true, mockInitializer2Data);
1040+
_statusCallback(DataSourceState.Closed);
1041+
},
1042+
),
1043+
stop: jest.fn(),
1044+
};
1045+
1046+
const mockSynchronizer1Data = { key: 'sync1' };
1047+
const mockSynchronizer1 = {
1048+
start: jest
1049+
.fn()
1050+
.mockImplementation(
1051+
(
1052+
_dataCallback: (basis: boolean, data: any) => void,
1053+
_statusCallback: (status: DataSourceState, err?: any) => void,
1054+
) => {
1055+
_statusCallback(DataSourceState.Initializing);
1056+
_statusCallback(DataSourceState.Valid);
1057+
_dataCallback(false, mockSynchronizer1Data);
1058+
},
1059+
),
1060+
stop: jest.fn(),
1061+
};
1062+
1063+
const underTest = new CompositeDataSource(
1064+
[makeDataSourceFactory(mockInitializer1), makeDataSourceFactory(mockInitializer2)],
1065+
[makeDataSourceFactory(mockSynchronizer1)],
1066+
[],
1067+
undefined,
1068+
makeTestTransitionConditions(),
1069+
makeZeroBackoff(),
1070+
);
1071+
1072+
let dataCallback;
1073+
const statusCallback = jest.fn();
1074+
await new Promise<void>((resolve) => {
1075+
dataCallback = jest.fn((_: boolean, data: any) => {
1076+
if (data === mockSynchronizer1Data) {
1077+
resolve();
1078+
}
1079+
});
1080+
1081+
underTest.start(dataCallback, statusCallback);
1082+
});
1083+
1084+
expect(mockInitializer1.start).toHaveBeenCalledTimes(1);
1085+
expect(mockInitializer2.start).toHaveBeenCalledTimes(1);
1086+
expect(mockSynchronizer1.start).toHaveBeenCalledTimes(1);
1087+
expect(statusCallback).toHaveBeenCalledTimes(5);
1088+
expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Initializing, undefined);
1089+
expect(statusCallback).toHaveBeenNthCalledWith(
1090+
2,
1091+
DataSourceState.Interrupted,
1092+
mockInitializer1Error,
1093+
);
1094+
expect(statusCallback).toHaveBeenNthCalledWith(3, DataSourceState.Valid, undefined);
1095+
expect(statusCallback).toHaveBeenNthCalledWith(4, DataSourceState.Interrupted, undefined);
1096+
expect(statusCallback).toHaveBeenNthCalledWith(5, DataSourceState.Valid, undefined);
1097+
});
1098+
1099+
it('does not run second initializer when first initializer succeeds', async () => {
1100+
const mockInitializer1Data = { key: 'init1' };
1101+
const mockInitializer1: DataSource = {
1102+
start: jest
1103+
.fn()
1104+
.mockImplementation(
1105+
(
1106+
_dataCallback: (basis: boolean, data: any) => void,
1107+
_statusCallback: (status: DataSourceState, err?: any) => void,
1108+
) => {
1109+
_statusCallback(DataSourceState.Initializing);
1110+
_statusCallback(DataSourceState.Valid);
1111+
_dataCallback(true, mockInitializer1Data);
1112+
_statusCallback(DataSourceState.Closed);
1113+
},
1114+
),
1115+
stop: jest.fn(),
1116+
};
1117+
1118+
const mockInitializer2: DataSource = {
1119+
start: jest.fn(),
1120+
stop: jest.fn(),
1121+
};
1122+
1123+
const mockSynchronizer1Data = { key: 'sync1' };
1124+
const mockSynchronizer1 = {
1125+
start: jest
1126+
.fn()
1127+
.mockImplementation(
1128+
(
1129+
_dataCallback: (basis: boolean, data: any) => void,
1130+
_statusCallback: (status: DataSourceState, err?: any) => void,
1131+
) => {
1132+
_statusCallback(DataSourceState.Initializing);
1133+
_statusCallback(DataSourceState.Valid);
1134+
_dataCallback(false, mockSynchronizer1Data);
1135+
},
1136+
),
1137+
stop: jest.fn(),
1138+
};
1139+
1140+
const underTest = new CompositeDataSource(
1141+
[makeDataSourceFactory(mockInitializer1), makeDataSourceFactory(mockInitializer2)],
1142+
[makeDataSourceFactory(mockSynchronizer1)],
1143+
[],
1144+
undefined,
1145+
makeTestTransitionConditions(),
1146+
makeZeroBackoff(),
1147+
);
1148+
1149+
let dataCallback;
1150+
const statusCallback = jest.fn();
1151+
await new Promise<void>((resolve) => {
1152+
dataCallback = jest.fn((_: boolean, data: any) => {
1153+
if (data === mockSynchronizer1Data) {
1154+
resolve();
1155+
}
1156+
});
1157+
1158+
underTest.start(dataCallback, statusCallback);
1159+
});
1160+
1161+
expect(mockInitializer1.start).toHaveBeenCalledTimes(1);
1162+
expect(mockInitializer2.start).toHaveBeenCalledTimes(0);
1163+
expect(mockSynchronizer1.start).toHaveBeenCalledTimes(1);
1164+
});
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
import { PayloadProcessor } from './payloadProcessor';
2+
import { Event } from './proto';
3+
4+
interface fdv1Payload {
5+
flags: { [name: string]: any };
6+
segments: { [name: string]: any };
7+
}
8+
9+
const PAYLOAD_ID = 'FDv1Fallback';
10+
11+
/**
12+
* The FDv1PayloadAdaptor is a helper class that converts FDv1 payloads into events that the PayloadProcessor can understand.
13+
*/
14+
export interface FDv1PayloadAdaptor {
15+
/**
16+
* The PayloadProcessor that will be used to process the events.
17+
*/
18+
readonly _processor: PayloadProcessor;
19+
20+
/**
21+
* The selector that will be used to identify the payload.
22+
*/
23+
_selector: string;
24+
25+
/**
26+
* The method that will be used to set a selector for the payload that is
27+
* being processed.
28+
*
29+
* @remarks
30+
* This method probably shouldn't be used in most instances as FDv1 payloads
31+
* do not have the concept of a selector.
32+
*
33+
* @param selector - The selector to set for the payload.
34+
* @returns this FDv1PayloadAdaptor instance
35+
*/
36+
useSelector: (selector: string) => FDv1PayloadAdaptor;
37+
38+
/**
39+
* The method that will be used to process a full transfer changeset.
40+
*
41+
* @param data - The data to process.
42+
*/
43+
processFullTransfer: (data: fdv1Payload) => void;
44+
}
45+
46+
export function fdv1PayloadAdaptor(processor: PayloadProcessor): FDv1PayloadAdaptor {
47+
return {
48+
_processor: processor,
49+
_selector: '',
50+
useSelector(selector: string): FDv1PayloadAdaptor {
51+
this._selector = selector;
52+
return this;
53+
},
54+
processFullTransfer(data) {
55+
const events: Array<Event> = [
56+
{
57+
event: 'server-intent',
58+
data: {
59+
payloads: [
60+
{
61+
id: PAYLOAD_ID,
62+
target: 1,
63+
intentCode: 'xfer-full',
64+
reason: 'payload-missing',
65+
},
66+
],
67+
},
68+
},
69+
];
70+
71+
Object.entries(data?.flags || []).forEach(([key, flag]) => {
72+
events.push({
73+
event: 'put-object',
74+
data: {
75+
kind: 'flag',
76+
key,
77+
version: flag.version || 1,
78+
object: flag,
79+
},
80+
});
81+
});
82+
83+
Object.entries(data?.segments || []).forEach(([key, segment]) => {
84+
events.push({
85+
event: 'put-object',
86+
data: {
87+
kind: 'segment',
88+
key,
89+
version: segment.version || 1,
90+
object: segment,
91+
},
92+
});
93+
});
94+
95+
events.push({
96+
event: 'payload-transferred',
97+
data: {
98+
// IMPORTANT: the selector MUST be empty or "live" data synchronizers
99+
// will not work as it would try to resume from a bogus state.
100+
state: this._selector,
101+
version: 1,
102+
id: PAYLOAD_ID,
103+
},
104+
});
105+
106+
this._processor.processEvents(events);
107+
},
108+
};
109+
}

packages/shared/common/src/internal/fdv2/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { fdv1PayloadAdaptor as FDv1PayloadAdaptor } from './FDv1PayloadAdaptor';
12
import {
23
FDv2EventsCollection,
34
Payload,
@@ -14,4 +15,5 @@ export {
1415
PayloadProcessor,
1516
PayloadStreamReader,
1617
Update,
18+
FDv1PayloadAdaptor,
1719
};

packages/shared/common/src/internal/fdv2/payloadProcessor.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,8 +221,11 @@ export class PayloadProcessor {
221221
private _processPayloadTransferred = (data: PayloadTransferred) => {
222222
// if the following properties haven't been provided by now, we should reset
223223
if (
224-
!this._tempId || // server intent hasn't been received yet.
225-
!data.state ||
224+
// server intent hasn't been received yet.
225+
!this._tempId ||
226+
// selector can be an empty string if we are using a file data initilizer
227+
data.state === null ||
228+
data.state === undefined ||
226229
!data.version
227230
) {
228231
this._resetAll(); // a reset is best defensive action since payload transferred terminates a payload

0 commit comments

Comments
 (0)