Skip to content

Commit 67a164b

Browse files
Update splitChangesUpdater, splitChangesFetcher and related types
1 parent e22b286 commit 67a164b

File tree

8 files changed

+102
-60
lines changed

8 files changed

+102
-60
lines changed

src/dtos/types.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,8 +228,16 @@ export type ISplitPartial = Pick<ISplit, 'conditions' | 'configurations' | 'traf
228228

229229
/** Interface of the parsed JSON response of `/splitChanges` */
230230
export interface ISplitChangesResponse {
231-
till: number,
232-
splits: ISplit[]
231+
ff?: {
232+
t: number,
233+
s?: number,
234+
d: ISplit[]
235+
},
236+
rbs?: {
237+
t: number,
238+
s?: number,
239+
d: IRBSegment[]
240+
}
233241
}
234242

235243
/** Interface of the parsed JSON response of `/segmentChanges/{segmentName}` */

src/logger/constants.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ export const RETRIEVE_MANAGER = 29;
2121
export const SYNC_OFFLINE_DATA = 30;
2222
export const SYNC_SPLITS_FETCH = 31;
2323
export const SYNC_SPLITS_UPDATE = 32;
24+
export const SYNC_RBS_UPDATE = 33;
2425
export const STREAMING_NEW_MESSAGE = 35;
2526
export const SYNC_TASK_START = 36;
2627
export const SYNC_TASK_EXECUTE = 37;

src/logger/messages/debug.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ export const codesDebug: [number, string][] = codesInfo.concat([
2020
[c.RETRIEVE_MANAGER, 'Retrieving manager instance.'],
2121
// synchronizer
2222
[c.SYNC_OFFLINE_DATA, c.LOG_PREFIX_SYNC_OFFLINE + 'Feature flags data: \n%s'],
23-
[c.SYNC_SPLITS_FETCH, c.LOG_PREFIX_SYNC_SPLITS + 'Spin up feature flags update using since = %s'],
24-
[c.SYNC_SPLITS_UPDATE, c.LOG_PREFIX_SYNC_SPLITS + 'New feature flags %s. Removed feature flags %s. Segment names collected %s'],
23+
[c.SYNC_SPLITS_FETCH, c.LOG_PREFIX_SYNC_SPLITS + 'Spin up feature flags update using since = %s and rbSince = %s.'],
24+
[c.SYNC_SPLITS_UPDATE, c.LOG_PREFIX_SYNC_SPLITS + 'New feature flags %s. Removed feature flags %s.'],
25+
[c.SYNC_RBS_UPDATE, c.LOG_PREFIX_SYNC_SPLITS + 'New rule-based segments %s. Removed rule-based segments %s.'],
2526
[c.STREAMING_NEW_MESSAGE, c.LOG_PREFIX_SYNC_STREAMING + 'New SSE message received, with data: %s.'],
2627
[c.SYNC_TASK_START, c.LOG_PREFIX_SYNC + ': Starting %s. Running each %s millis'],
2728
[c.SYNC_TASK_EXECUTE, c.LOG_PREFIX_SYNC + ': Running %s'],

src/services/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ export type ISplitHttpClient = (url: string, options?: IRequestOptions, latencyT
3535

3636
export type IFetchAuth = (userKeys?: string[]) => Promise<IResponse>
3737

38-
export type IFetchSplitChanges = (since: number, noCache?: boolean, till?: number) => Promise<IResponse>
38+
export type IFetchSplitChanges = (since: number, noCache?: boolean, till?: number, rbSince?: number) => Promise<IResponse>
3939

4040
export type IFetchSegmentChanges = (since: number, segmentName: string, noCache?: boolean, till?: number) => Promise<IResponse>
4141

src/sync/polling/fetchers/splitChangesFetcher.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@ export function splitChangesFetcherFactory(fetchSplitChanges: IFetchSplitChanges
1111
since: number,
1212
noCache?: boolean,
1313
till?: number,
14+
rbSince?: number,
1415
// Optional decorator for `fetchSplitChanges` promise, such as timeout or time tracker
1516
decorator?: (promise: Promise<IResponse>) => Promise<IResponse>
1617
) {
1718

18-
let splitsPromise = fetchSplitChanges(since, noCache, till);
19+
let splitsPromise = fetchSplitChanges(since, noCache, till, rbSince);
1920
if (decorator) splitsPromise = decorator(splitsPromise);
2021

2122
return splitsPromise.then(resp => resp.json());

src/sync/polling/fetchers/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ export type ISplitChangesFetcher = (
55
since: number,
66
noCache?: boolean,
77
till?: number,
8+
rbSince?: number,
89
decorator?: (promise: Promise<IResponse>) => Promise<IResponse>
910
) => Promise<ISplitChangesResponse>
1011

src/sync/polling/updaters/__tests__/splitChangesUpdater.spec.ts

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,15 @@ import { splitApiFactory } from '../../../../services/splitApi';
44
import { SegmentsCacheInMemory } from '../../../../storages/inMemory/SegmentsCacheInMemory';
55
import { SplitsCacheInMemory } from '../../../../storages/inMemory/SplitsCacheInMemory';
66
import { splitChangesFetcherFactory } from '../../fetchers/splitChangesFetcher';
7-
import { splitChangesUpdaterFactory, parseSegments, computeSplitsMutation } from '../splitChangesUpdater';
7+
import { splitChangesUpdaterFactory, parseSegments, computeMutation } from '../splitChangesUpdater';
88
import splitChangesMock1 from '../../../../__tests__/mocks/splitchanges.since.-1.json';
99
import fetchMock from '../../../../__tests__/testUtils/fetchMock';
1010
import { fullSettings, settingsSplitApi } from '../../../../utils/settingsValidation/__tests__/settings.mocks';
1111
import { EventEmitter } from '../../../../utils/MinEvents';
1212
import { loggerMock } from '../../../../logger/__tests__/sdkLogger.mock';
1313
import { telemetryTrackerFactory } from '../../../../trackers/telemetryTracker';
1414
import { splitNotifications } from '../../../streaming/__tests__/dataMocks';
15+
import { RBSegmentsCacheInMemory } from '../../../../storages/inMemory/RBSegmentsCacheInMemory';
1516

1617
const ARCHIVED_FF = 'ARCHIVED';
1718

@@ -94,58 +95,60 @@ test('splitChangesUpdater / segments parser', () => {
9495
test('splitChangesUpdater / compute splits mutation', () => {
9596
const splitFiltersValidation = { queryString: null, groupedFilters: { bySet: [], byName: [], byPrefix: [] }, validFilters: [] };
9697

97-
let splitsMutation = computeSplitsMutation([activeSplitWithSegments, archivedSplit] as ISplit[], splitFiltersValidation);
98+
let segments = new Set<string>();
99+
let splitsMutation = computeMutation([activeSplitWithSegments, archivedSplit] as ISplit[], segments, splitFiltersValidation);
98100

99101
expect(splitsMutation.added).toEqual([activeSplitWithSegments]);
100102
expect(splitsMutation.removed).toEqual([archivedSplit]);
101-
expect(splitsMutation.segments).toEqual(['A', 'B']);
103+
expect(Array.from(segments)).toEqual(['A', 'B']);
102104

103105
// SDK initialization without sets
104106
// should process all the notifications
105-
splitsMutation = computeSplitsMutation([testFFSetsAB, test2FFSetsX] as ISplit[], splitFiltersValidation);
107+
segments = new Set<string>();
108+
splitsMutation = computeMutation([testFFSetsAB, test2FFSetsX] as ISplit[], segments, splitFiltersValidation);
106109

107110
expect(splitsMutation.added).toEqual([testFFSetsAB, test2FFSetsX]);
108111
expect(splitsMutation.removed).toEqual([]);
109-
expect(splitsMutation.segments).toEqual([]);
112+
expect(Array.from(segments)).toEqual([]);
110113
});
111114

112115
test('splitChangesUpdater / compute splits mutation with filters', () => {
113116
// SDK initialization with sets: [set_a, set_b]
114117
let splitFiltersValidation = { queryString: '&sets=set_a,set_b', groupedFilters: { bySet: ['set_a', 'set_b'], byName: ['name_1'], byPrefix: [] }, validFilters: [] };
115118

116119
// fetching new feature flag in sets A & B
117-
let splitsMutation = computeSplitsMutation([testFFSetsAB], splitFiltersValidation);
120+
let splitsMutation = computeMutation([testFFSetsAB], new Set(), splitFiltersValidation);
118121

119122
// should add it to mutations
120123
expect(splitsMutation.added).toEqual([testFFSetsAB]);
121124
expect(splitsMutation.removed).toEqual([]);
122125

123126
// fetching existing test feature flag removed from set B
124-
splitsMutation = computeSplitsMutation([testFFRemoveSetB], splitFiltersValidation);
127+
splitsMutation = computeMutation([testFFRemoveSetB], new Set(), splitFiltersValidation);
125128

126129
expect(splitsMutation.added).toEqual([testFFRemoveSetB]);
127130
expect(splitsMutation.removed).toEqual([]);
128131

129132
// fetching existing test feature flag removed from set B
130-
splitsMutation = computeSplitsMutation([testFFRemoveSetA], splitFiltersValidation);
133+
splitsMutation = computeMutation([testFFRemoveSetA], new Set(), splitFiltersValidation);
131134

132135
expect(splitsMutation.added).toEqual([]);
133136
expect(splitsMutation.removed).toEqual([testFFRemoveSetA]);
134137

135138
// fetching existing test feature flag removed from set B
136-
splitsMutation = computeSplitsMutation([testFFEmptySet], splitFiltersValidation);
139+
splitsMutation = computeMutation([testFFEmptySet], new Set(), splitFiltersValidation);
137140

138141
expect(splitsMutation.added).toEqual([]);
139142
expect(splitsMutation.removed).toEqual([testFFEmptySet]);
140143

141144
// SDK initialization with names: ['test2']
142145
splitFiltersValidation = { queryString: '&names=test2', groupedFilters: { bySet: [], byName: ['test2'], byPrefix: [] }, validFilters: [] };
143-
splitsMutation = computeSplitsMutation([testFFSetsAB], splitFiltersValidation);
146+
splitsMutation = computeMutation([testFFSetsAB], new Set(), splitFiltersValidation);
144147

145148
expect(splitsMutation.added).toEqual([]);
146149
expect(splitsMutation.removed).toEqual([testFFSetsAB]);
147150

148-
splitsMutation = computeSplitsMutation([test2FFSetsX, testFFEmptySet], splitFiltersValidation);
151+
splitsMutation = computeMutation([test2FFSetsX, testFFEmptySet], new Set(), splitFiltersValidation);
149152

150153
expect(splitsMutation.added).toEqual([test2FFSetsX]);
151154
expect(splitsMutation.removed).toEqual([testFFEmptySet]);
@@ -161,10 +164,13 @@ describe('splitChangesUpdater', () => {
161164
const splits = new SplitsCacheInMemory();
162165
const updateSplits = jest.spyOn(splits, 'update');
163166

167+
const rbSegments = new RBSegmentsCacheInMemory();
168+
// @TODO spy on rbSegments
169+
164170
const segments = new SegmentsCacheInMemory();
165171
const registerSegments = jest.spyOn(segments, 'registerSegments');
166172

167-
const storage = { splits, segments };
173+
const storage = { splits, rbSegments, segments };
168174

169175
const readinessManager = readinessManagerFactory(EventEmitter, fullSettings);
170176
const splitsEmitSpy = jest.spyOn(readinessManager.splits, 'emit');

src/sync/polling/updaters/splitChangesUpdater.ts

Lines changed: 66 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import { ISegmentsCacheBase, IStorageBase } from '../../../storages/types';
22
import { ISplitChangesFetcher } from '../fetchers/types';
3-
import { ISplit, ISplitChangesResponse, ISplitFiltersValidation } from '../../../dtos/types';
3+
import { IRBSegment, ISplit, ISplitChangesResponse, ISplitFiltersValidation, MaybeThenable } from '../../../dtos/types';
44
import { ISplitsEventEmitter } from '../../../readiness/types';
55
import { timeout } from '../../../utils/promise/timeout';
66
import { SDK_SPLITS_ARRIVED, SDK_SPLITS_CACHE_LOADED } from '../../../readiness/constants';
77
import { ILogger } from '../../../logger/types';
8-
import { SYNC_SPLITS_FETCH, SYNC_SPLITS_UPDATE, SYNC_SPLITS_FETCH_FAILS, SYNC_SPLITS_FETCH_RETRY } from '../../../logger/constants';
8+
import { SYNC_SPLITS_FETCH, SYNC_SPLITS_UPDATE, SYNC_RBS_UPDATE, SYNC_SPLITS_FETCH_FAILS, SYNC_SPLITS_FETCH_RETRY } from '../../../logger/constants';
99
import { startsWith } from '../../../utils/lang';
10-
import { IN_SEGMENT } from '../../../utils/constants';
10+
import { IN_RULE_BASED_SEGMENT, IN_SEGMENT } from '../../../utils/constants';
1111
import { setToArray } from '../../../utils/lang/sets';
1212

1313
type ISplitChangesUpdater = (noCache?: boolean, till?: number, splitUpdateNotification?: { payload: ISplit, changeNumber: number }) => Promise<boolean>
@@ -27,24 +27,23 @@ function checkAllSegmentsExist(segments: ISegmentsCacheBase): Promise<boolean> {
2727
* Collect segments from a raw split definition.
2828
* Exported for testing purposes.
2929
*/
30-
export function parseSegments({ conditions }: ISplit): Set<string> {
30+
export function parseSegments({ conditions }: ISplit | IRBSegment, matcherType: typeof IN_SEGMENT | typeof IN_RULE_BASED_SEGMENT = IN_SEGMENT): Set<string> {
3131
let segments = new Set<string>();
3232

3333
for (let i = 0; i < conditions.length; i++) {
3434
const matchers = conditions[i].matcherGroup.matchers;
3535

3636
matchers.forEach(matcher => {
37-
if (matcher.matcherType === IN_SEGMENT) segments.add(matcher.userDefinedSegmentMatcherData.segmentName);
37+
if (matcher.matcherType === matcherType) segments.add(matcher.userDefinedSegmentMatcherData.segmentName);
3838
});
3939
}
4040

4141
return segments;
4242
}
4343

44-
interface ISplitMutations {
45-
added: ISplit[],
46-
removed: ISplit[],
47-
segments: string[]
44+
interface ISplitMutations<T extends ISplit | IRBSegment> {
45+
added: T[],
46+
removed: T[]
4847
}
4948

5049
/**
@@ -68,30 +67,30 @@ function matchFilters(featureFlag: ISplit, filters: ISplitFiltersValidation) {
6867
return matchNames || matchPrefix;
6968
}
7069

70+
function isFF(ruleBasedEntity: IRBSegment | ISplit): ruleBasedEntity is ISplit {
71+
return (ruleBasedEntity as ISplit).defaultTreatment !== undefined;
72+
}
73+
7174
/**
7275
* Given the list of splits from /splitChanges endpoint, it returns the mutations,
7376
* i.e., an object with added splits, removed splits and used segments.
7477
* Exported for testing purposes.
7578
*/
76-
export function computeSplitsMutation(entries: ISplit[], filters: ISplitFiltersValidation): ISplitMutations {
77-
const segments = new Set<string>();
78-
const computed = entries.reduce((accum, split) => {
79-
if (split.status === 'ACTIVE' && matchFilters(split, filters)) {
80-
accum.added.push(split);
79+
export function computeMutation<T extends ISplit | IRBSegment>(rules: Array<T>, segments: Set<string>, filters?: ISplitFiltersValidation): ISplitMutations<T> {
8180

82-
parseSegments(split).forEach((segmentName: string) => {
81+
return rules.reduce((accum, ruleBasedEntity) => {
82+
if (ruleBasedEntity.status === 'ACTIVE' && (!filters || matchFilters(ruleBasedEntity as ISplit, filters))) {
83+
accum.added.push(ruleBasedEntity);
84+
85+
parseSegments(ruleBasedEntity).forEach((segmentName: string) => {
8386
segments.add(segmentName);
8487
});
8588
} else {
86-
accum.removed.push(split);
89+
accum.removed.push(ruleBasedEntity);
8790
}
8891

8992
return accum;
90-
}, { added: [], removed: [], segments: [] } as ISplitMutations);
91-
92-
computed.segments = setToArray(segments);
93-
94-
return computed;
93+
}, { added: [], removed: [] } as ISplitMutations<T>);
9594
}
9695

9796
/**
@@ -111,14 +110,14 @@ export function computeSplitsMutation(entries: ISplit[], filters: ISplitFiltersV
111110
export function splitChangesUpdaterFactory(
112111
log: ILogger,
113112
splitChangesFetcher: ISplitChangesFetcher,
114-
storage: Pick<IStorageBase, 'splits' | 'segments'>,
113+
storage: Pick<IStorageBase, 'splits' | 'rbSegments' | 'segments'>,
115114
splitFiltersValidation: ISplitFiltersValidation,
116115
splitsEventEmitter?: ISplitsEventEmitter,
117116
requestTimeoutBeforeReady: number = 0,
118117
retriesOnFailureBeforeReady: number = 0,
119118
isClientSide?: boolean
120119
): ISplitChangesUpdater {
121-
const { splits, segments } = storage;
120+
const { splits, rbSegments, segments } = storage;
122121

123122
let startingUp = true;
124123

@@ -135,35 +134,60 @@ export function splitChangesUpdaterFactory(
135134
* @param noCache - true to revalidate data to fetch
136135
* @param till - query param to bypass CDN requests
137136
*/
138-
return function splitChangesUpdater(noCache?: boolean, till?: number, splitUpdateNotification?: { payload: ISplit, changeNumber: number }) {
137+
return function splitChangesUpdater(noCache?: boolean, till?: number, splitUpdateNotification?: { payload: ISplit | IRBSegment, changeNumber: number }) {
139138

140139
/**
141140
* @param since - current changeNumber at splitsCache
142141
* @param retry - current number of retry attempts
143142
*/
144-
function _splitChangesUpdater(since: number, retry = 0): Promise<boolean> {
145-
log.debug(SYNC_SPLITS_FETCH, [since]);
146-
const fetcherPromise = Promise.resolve(splitUpdateNotification ?
147-
{ splits: [splitUpdateNotification.payload], till: splitUpdateNotification.changeNumber } :
148-
splitChangesFetcher(since, noCache, till, _promiseDecorator)
143+
function _splitChangesUpdater(sinces: [number, number], retry = 0): Promise<boolean> {
144+
const [since, rbSince] = sinces;
145+
log.debug(SYNC_SPLITS_FETCH, sinces);
146+
const fetcherPromise = Promise.resolve(
147+
splitUpdateNotification ?
148+
isFF(splitUpdateNotification.payload) ?
149+
// IFFU edge case: a change to a flag that adds an IN_RULE_BASED_SEGMENT matcher that is not present yet
150+
Promise.resolve(rbSegments.contains(parseSegments(splitUpdateNotification.payload, IN_RULE_BASED_SEGMENT))).then((contains) => {
151+
return contains ?
152+
{ ff: { d: [splitUpdateNotification.payload as ISplit], t: splitUpdateNotification.changeNumber } } :
153+
splitChangesFetcher(since, noCache, till, rbSince, _promiseDecorator);
154+
}) :
155+
{ rbs: { d: [splitUpdateNotification.payload as IRBSegment], t: splitUpdateNotification.changeNumber } } :
156+
splitChangesFetcher(since, noCache, till, rbSince, _promiseDecorator)
149157
)
150158
.then((splitChanges: ISplitChangesResponse) => {
151159
startingUp = false;
152160

153-
const mutation = computeSplitsMutation(splitChanges.splits, splitFiltersValidation);
161+
const usedSegments = new Set<string>();
162+
163+
let ffUpdate: MaybeThenable<boolean> = false;
164+
if (splitChanges.ff) {
165+
const { added, removed } = computeMutation(splitChanges.ff.d, usedSegments, splitFiltersValidation);
166+
log.debug(SYNC_SPLITS_UPDATE, [added.length, removed.length]);
167+
ffUpdate = splits.update(added, removed, splitChanges.ff.t);
168+
}
154169

155-
log.debug(SYNC_SPLITS_UPDATE, [mutation.added.length, mutation.removed.length, mutation.segments.length]);
170+
let rbsUpdate: MaybeThenable<boolean> = false;
171+
if (splitChanges.rbs) {
172+
const { added, removed } = computeMutation(splitChanges.rbs.d, usedSegments);
173+
log.debug(SYNC_RBS_UPDATE, [added.length, removed.length]);
174+
rbsUpdate = rbSegments.update(added, removed, splitChanges.rbs.t);
175+
}
156176

157-
// Write into storage
158-
// @TODO call `setChangeNumber` only if the other storage operations have succeeded, in order to keep storage consistency
159-
return Promise.all([
160-
splits.update(mutation.added, mutation.removed, splitChanges.till),
161-
segments.registerSegments(mutation.segments)
162-
]).then(([isThereUpdate]) => {
177+
return Promise.all([ffUpdate, rbsUpdate,
178+
// @TODO if at least 1 segment fetch fails due to 404 and other segments are updated in the storage, SDK_UPDATE is not emitted
179+
segments.registerSegments(setToArray(usedSegments))
180+
]).then(([ffChanged, rbsChanged]) => {
163181
if (splitsEventEmitter) {
164182
// To emit SDK_SPLITS_ARRIVED for server-side SDK, we must check that all registered segments have been fetched
165-
return Promise.resolve(!splitsEventEmitter.splitsArrived || (since !== splitChanges.till && isThereUpdate && (isClientSide || checkAllSegmentsExist(segments))))
166-
.catch(() => false /** noop. just to handle a possible `checkAllSegmentsExist` rejection, before emitting SDK event */)
183+
return Promise.resolve(!splitsEventEmitter.splitsArrived ||
184+
(
185+
(!splitChanges.ff || since !== splitChanges.ff.t) &&
186+
(!splitChanges.rbs || rbSince !== splitChanges.rbs.t) &&
187+
(ffChanged || rbsChanged) &&
188+
(isClientSide || checkAllSegmentsExist(segments))
189+
)
190+
)
167191
.then(emitSplitsArrivedEvent => {
168192
// emit SDK events
169193
if (emitSplitsArrivedEvent) splitsEventEmitter.emit(SDK_SPLITS_ARRIVED);
@@ -179,7 +203,7 @@ export function splitChangesUpdaterFactory(
179203
if (startingUp && retriesOnFailureBeforeReady > retry) {
180204
retry += 1;
181205
log.info(SYNC_SPLITS_FETCH_RETRY, [retry, error]);
182-
return _splitChangesUpdater(since, retry);
206+
return _splitChangesUpdater(sinces, retry);
183207
} else {
184208
startingUp = false;
185209
}
@@ -196,7 +220,7 @@ export function splitChangesUpdaterFactory(
196220
return fetcherPromise;
197221
}
198222

199-
let sincePromise = Promise.resolve(splits.getChangeNumber()); // `getChangeNumber` never rejects or throws error
200-
return sincePromise.then(_splitChangesUpdater);
223+
// `getChangeNumber` never rejects or throws error
224+
return Promise.all([splits.getChangeNumber(), rbSegments.getChangeNumber()]).then(_splitChangesUpdater);
201225
};
202226
}

0 commit comments

Comments
 (0)