Skip to content

Commit 0c9e297

Browse files
committed
Merge remote-tracking branch 'origin/rbs-oldspec-fetcher' into rbs-oldspec-storage
2 parents c8f8533 + 4a07bc6 commit 0c9e297

33 files changed

+619
-336
lines changed

client/src/main/java/io/split/client/HttpSplitChangeFetcher.java

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.split.Spec;
66
import io.split.client.dtos.SplitChange;
77
import io.split.client.dtos.SplitHttpResponse;
8+
import io.split.client.dtos.RuleBasedSegment;
89
import io.split.client.dtos.SplitChangesOldPayloadDto;
910
import io.split.client.dtos.ChangeDto;
1011
import io.split.client.dtos.Split;
@@ -24,8 +25,10 @@
2425

2526
import java.net.URI;
2627
import java.net.URISyntaxException;
28+
import java.util.ArrayList;
2729

2830
import static com.google.common.base.Preconditions.checkNotNull;
31+
import static io.split.Spec.SPEC_VERSION;
2932
import static io.split.Spec.SPEC_1_3;
3033
import static io.split.Spec.SPEC_1_1;
3134

@@ -41,7 +44,6 @@ public final class HttpSplitChangeFetcher implements SplitChangeFetcher {
4144
private static final String TILL = "till";
4245
private static final String SETS = "sets";
4346
private static final String SPEC = "s";
44-
private String specVersion = SPEC_1_3;
4547
private int PROXY_CHECK_INTERVAL_MILLISECONDS_SS = 24 * 60 * 60 * 1000;
4648
private Long _lastProxyCheckTimestamp = 0L;
4749
private final SplitHttpClient _client;
@@ -73,11 +75,9 @@ public SplitChange fetch(long since, long sinceRBS, FetchOptions options) {
7375
long start = System.currentTimeMillis();
7476
SplitHttpResponse response;
7577
try {
76-
if (specVersion.equals(SPEC_1_1) && (System.currentTimeMillis() - _lastProxyCheckTimestamp >= PROXY_CHECK_INTERVAL_MILLISECONDS_SS)) {
78+
if (SPEC_VERSION.equals(SPEC_1_1) && (System.currentTimeMillis() - _lastProxyCheckTimestamp >= PROXY_CHECK_INTERVAL_MILLISECONDS_SS)) {
7779
_log.info("Switching to new Feature flag spec ({}) and fetching.", SPEC_1_3);
78-
specVersion = SPEC_1_3;
79-
since = -1;
80-
sinceRBS = -1;
80+
SPEC_VERSION = SPEC_1_3;
8181
}
8282
URI uri = buildURL(options, since, sinceRBS);
8383
response = _client.get(uri, options, null);
@@ -87,8 +87,8 @@ public SplitChange fetch(long since, long sinceRBS, FetchOptions options) {
8787
throw new UriTooLongException(String.format("Status code: %s. Message: %s", response.statusCode(), response.statusMessage()));
8888
}
8989

90-
if (response.statusCode() == HttpStatus.SC_BAD_REQUEST && specVersion.equals(Spec.SPEC_1_3) && _rootURIOverriden) {
91-
specVersion = Spec.SPEC_1_1;
90+
if (response.statusCode() == HttpStatus.SC_BAD_REQUEST && SPEC_VERSION.equals(Spec.SPEC_1_3) && _rootURIOverriden) {
91+
SPEC_VERSION = Spec.SPEC_1_1;
9292
_log.warn("Detected proxy without support for Feature flags spec {} version, will switch to spec version {}",
9393
SPEC_1_3, SPEC_1_1);
9494
_lastProxyCheckTimestamp = System.currentTimeMillis();
@@ -107,29 +107,40 @@ public SplitChange fetch(long since, long sinceRBS, FetchOptions options) {
107107
}
108108

109109
SplitChange splitChange = new SplitChange();
110-
if (specVersion.equals(Spec.SPEC_1_1)) {
110+
if (SPEC_VERSION.equals(Spec.SPEC_1_1)) {
111111
splitChange.featureFlags = convertBodyToOldSpec(response.body());
112-
splitChange.ruleBasedSegments = ChangeDto.createEmptyDto();
112+
splitChange.ruleBasedSegments = createEmptyDTO();
113113
} else {
114114
splitChange = Json.fromJson(response.body(), SplitChange.class);
115-
if (specVersion.equals(Spec.SPEC_1_3) && _lastProxyCheckTimestamp != 0) {
116-
splitChange.clearCache = true;
117-
_lastProxyCheckTimestamp = 0L;
118-
}
119115
}
120116
return splitChange;
121117
}
122118

119+
public Long getLastProxyCheckTimestamp() {
120+
return _lastProxyCheckTimestamp;
121+
}
122+
123+
public void setLastProxyCheckTimestamp(long lastProxyCheckTimestamp) {
124+
synchronized (_lock) {
125+
_lastProxyCheckTimestamp = lastProxyCheckTimestamp;
126+
}
127+
}
128+
129+
private ChangeDto<RuleBasedSegment> createEmptyDTO() {
130+
ChangeDto<RuleBasedSegment> dto = new ChangeDto<>();
131+
dto.d = new ArrayList<>();
132+
dto.t = -1;
133+
dto.s = -1;
134+
return dto;
135+
}
123136
private ChangeDto<Split> convertBodyToOldSpec(String body) {
124137
return Json.fromJson(body, SplitChangesOldPayloadDto.class).toChangeDTO();
125138
}
126139

127140
private URI buildURL(FetchOptions options, long since, long sinceRBS) throws URISyntaxException {
128-
URIBuilder uriBuilder = new URIBuilder(_target).addParameter(SPEC, "" + specVersion);
141+
URIBuilder uriBuilder = new URIBuilder(_target).addParameter(SPEC, "" + SPEC_VERSION);
129142
uriBuilder.addParameter(SINCE, "" + since);
130-
if (specVersion.equals(SPEC_1_3)) {
131-
uriBuilder.addParameter(RB_SINCE, "" + sinceRBS);
132-
}
143+
uriBuilder.addParameter(RB_SINCE, "" + sinceRBS);
133144
if (!options.flagSetsFilter().isEmpty()) {
134145
uriBuilder.addParameter(SETS, "" + options.flagSetsFilter());
135146
}

client/src/main/java/io/split/client/JsonLocalhostSplitChangeFetcher.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
package io.split.client;
22

3-
import com.google.gson.JsonObject;
43
import com.google.gson.stream.JsonReader;
54
import io.split.client.dtos.ChangeDto;
65
import io.split.client.dtos.SplitChange;
7-
import io.split.client.dtos.SplitChangesOldPayloadDto;
86
import io.split.client.utils.InputStreamProvider;
97
import io.split.client.utils.Json;
108
import io.split.client.utils.LocalhostSanitizer;
@@ -22,8 +20,6 @@
2220
import java.util.ArrayList;
2321
import java.util.Arrays;
2422

25-
import static io.split.client.utils.Json.fromJson;
26-
2723
public class JsonLocalhostSplitChangeFetcher implements SplitChangeFetcher {
2824

2925
private static final Logger _log = LoggerFactory.getLogger(JsonLocalhostSplitChangeFetcher.class);
@@ -41,23 +37,13 @@ public JsonLocalhostSplitChangeFetcher(InputStreamProvider inputStreamProvider)
4137
public SplitChange fetch(long since, long sinceRBS, FetchOptions options) {
4238
try {
4339
JsonReader jsonReader = new JsonReader(new BufferedReader(new InputStreamReader(_inputStreamProvider.get(), StandardCharsets.UTF_8)));
44-
if (checkOldSpec(new JsonReader(new BufferedReader(new InputStreamReader(_inputStreamProvider.get(), StandardCharsets.UTF_8))))) {
45-
SplitChange splitChange = new SplitChange();
46-
splitChange.featureFlags = Json.fromJson(jsonReader, SplitChangesOldPayloadDto.class).toChangeDTO();
47-
splitChange.ruleBasedSegments = ChangeDto.createEmptyDto();
48-
return splitChange;
49-
}
50-
SplitChange splitChange = fromJson(jsonReader, SplitChange.class);
40+
SplitChange splitChange = Json.fromJson(jsonReader, SplitChange.class);
5141
return processSplitChange(splitChange, since, sinceRBS);
5242
} catch (Exception e) {
5343
throw new IllegalStateException("Problem fetching splitChanges: " + e.getMessage(), e);
5444
}
5545
}
5646

57-
private boolean checkOldSpec(JsonReader jsonReader) {
58-
return Json.fromJson(jsonReader, JsonObject.class).has("splits");
59-
}
60-
6147
private SplitChange processSplitChange(SplitChange splitChange, long changeNumber, long changeNumberRBS) throws NoSuchAlgorithmException {
6248
SplitChange splitChangeToProcess = LocalhostSanitizer.sanitization(splitChange);
6349
// if the till is less than storage CN and different from the default till ignore the change

client/src/main/java/io/split/client/SplitClientConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ public CustomHeaderDecorator customHeaderDecorator() {
413413
}
414414

415415
public boolean isRootURIOverriden() {
416-
return _endpoint != SDK_ENDPOINT;
416+
return _endpoint == SDK_ENDPOINT;
417417
}
418418

419419
public CustomHttpModule alternativeHTTPModule() { return _alternativeHTTPModule; }

client/src/main/java/io/split/client/SplitFactoryImpl.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,12 +220,11 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
220220
// Segments
221221
_segmentSynchronizationTaskImp = buildSegments(config, segmentCache, splitCache);
222222

223-
224223
SplitParser splitParser = new SplitParser();
225224
RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser();
226225
// SplitFetcher
227226
_splitFetcher = buildSplitFetcher(splitCache, splitParser, flagSetsFilter,
228-
ruleBasedSegmentParser, ruleBasedSegmentCache, config.isRootURIOverriden());
227+
ruleBasedSegmentParser, ruleBasedSegmentCache, config.isSdkEndpointOverridden());
229228

230229
// SplitSynchronizationTask
231230
_splitSynchronizationTask = new SplitSynchronizationTask(_splitFetcher,

client/src/main/java/io/split/client/dtos/SplitChangesOldPayloadDto.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.google.gson.annotations.SerializedName;
44

5+
import java.util.ArrayList;
56
import java.util.List;
67

78
public class SplitChangesOldPayloadDto {
@@ -14,12 +15,20 @@ public class SplitChangesOldPayloadDto {
1415
@SerializedName("splits")
1516
public List<Split> d;
1617

17-
public ChangeDto<Split> toChangeDTO() {
18-
ChangeDto<Split> dto = new ChangeDto<>();
19-
dto.s = this.s;
20-
dto.t = this.t;
21-
dto.d = this.d;
22-
return dto;
23-
18+
public SplitChange toSplitChange() {
19+
SplitChange splitChange = new SplitChange();
20+
ChangeDto<Split> ff = new ChangeDto<>();
21+
ff.s = this.s;
22+
ff.t = this.t;
23+
ff.d = this.d;
24+
ChangeDto<RuleBasedSegment> rbs = new ChangeDto<>();
25+
rbs.d = new ArrayList<>();
26+
rbs.t = -1;
27+
rbs.s = -1;
28+
29+
splitChange.featureFlags = ff;
30+
splitChange.ruleBasedSegments = rbs;
31+
32+
return splitChange;
2433
}
2534
}

client/src/main/java/io/split/client/utils/Utils.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.split.client.utils;
22

3+
import io.split.client.dtos.ChangeDto;
34
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
45
import org.apache.hc.core5.http.ContentType;
56
import org.apache.hc.core5.http.HttpEntity;
@@ -40,4 +41,8 @@ public static URI appendPath(URI root, String pathToAppend) throws URISyntaxExce
4041
String path = String.format("%s%s%s", root.getPath(), root.getPath().endsWith("/") ? "" : "/", pathToAppend);
4142
return new URIBuilder(root).setPath(path).build();
4243
}
44+
45+
public static <T> boolean checkExitConditions(ChangeDto<T> change, long cn) {
46+
return change.t < cn && change.t != -1;
47+
}
4348
}

client/src/main/java/io/split/engine/common/SynchronizerImp.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,8 @@ private SyncResult attemptSplitsSync(long targetChangeNumber, long ruleBasedSegm
118118
if (fetchResult != null && !fetchResult.retry() && !fetchResult.isSuccess()) {
119119
return new SyncResult(false, remainingAttempts, fetchResult);
120120
}
121-
if ((targetChangeNumber != 0 && targetChangeNumber <= _splitCacheProducer.getChangeNumber()) ||
122-
(ruleBasedSegmentChangeNumber != 0 && ruleBasedSegmentChangeNumber <= _ruleBasedSegmentCacheProducer.getChangeNumber())) {
121+
if (targetChangeNumber <= _splitCacheProducer.getChangeNumber()
122+
&& ruleBasedSegmentChangeNumber <= _ruleBasedSegmentCacheProducer.getChangeNumber()) {
123123
return new SyncResult(true, remainingAttempts, fetchResult);
124124
} else if (remainingAttempts <= 0) {
125125
return new SyncResult(false, remainingAttempts, fetchResult);
@@ -137,9 +137,15 @@ private SyncResult attemptSplitsSync(long targetChangeNumber, long ruleBasedSegm
137137
@Override
138138
public void refreshSplits(Long targetChangeNumber, Long ruleBasedSegmentChangeNumber) {
139139

140-
if ((targetChangeNumber != 0 && targetChangeNumber <= _splitCacheProducer.getChangeNumber()) ||
141-
(ruleBasedSegmentChangeNumber != 0 && ruleBasedSegmentChangeNumber <= _ruleBasedSegmentCacheProducer.getChangeNumber()) ||
142-
(ruleBasedSegmentChangeNumber == 0 && targetChangeNumber == 0)) {
140+
if (targetChangeNumber == null || targetChangeNumber == 0) {
141+
targetChangeNumber = _splitCacheProducer.getChangeNumber();
142+
}
143+
if (ruleBasedSegmentChangeNumber == null || ruleBasedSegmentChangeNumber == 0) {
144+
ruleBasedSegmentChangeNumber = _ruleBasedSegmentCacheProducer.getChangeNumber();
145+
}
146+
147+
if (targetChangeNumber <= _splitCacheProducer.getChangeNumber()
148+
&& ruleBasedSegmentChangeNumber <= _ruleBasedSegmentCacheProducer.getChangeNumber()) {
143149
return;
144150
}
145151

client/src/main/java/io/split/engine/experiments/SplitFetcherImp.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package io.split.engine.experiments;
22

3-
import io.split.Spec;
43
import io.split.client.dtos.ChangeDto;
54
import io.split.client.dtos.RuleBasedSegment;
65
import io.split.client.dtos.Split;
@@ -21,7 +20,6 @@
2120
import java.util.Set;
2221

2322
import static com.google.common.base.Preconditions.checkNotNull;
24-
2523
import static io.split.client.utils.FeatureFlagProcessor.processFeatureFlagChanges;
2624
import static io.split.client.utils.RuleBasedSegmentProcessor.processRuleBasedSegmentChanges;
2725

@@ -126,11 +124,6 @@ private Set<String> runWithoutExceptionHandling(FetchOptions options) throws Int
126124
throw new IllegalStateException("SplitChange was null");
127125
}
128126

129-
if (change.clearCache) {
130-
_splitCacheProducer.clear();
131-
_ruleBasedSegmentCacheProducer.clear();
132-
}
133-
134127
if (checkExitConditions(change.featureFlags, _splitCacheProducer.getChangeNumber()) ||
135128
checkExitConditions(change.ruleBasedSegments, _ruleBasedSegmentCacheProducer.getChangeNumber())) {
136129
return segments;
@@ -156,7 +149,6 @@ private Set<String> runWithoutExceptionHandling(FetchOptions options) throws Int
156149
// some other thread may have updated the shared state. exit
157150
return segments;
158151
}
159-
160152
FeatureFlagsToUpdate featureFlagsToUpdate = processFeatureFlagChanges(_parser, change.featureFlags.d, _flagSetsFilter);
161153
segments = featureFlagsToUpdate.getSegments();
162154
_splitCacheProducer.update(featureFlagsToUpdate.getToAdd(), featureFlagsToUpdate.getToRemove(), change.featureFlags.t);

client/src/main/java/io/split/engine/sse/NotificationParserImp.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
11
package io.split.engine.sse;
22

3+
import io.split.client.dtos.RuleBasedSegment;
4+
import io.split.client.dtos.Split;
35
import io.split.client.utils.Json;
46

57
import io.split.engine.sse.dtos.ControlNotification;
68
import io.split.engine.sse.dtos.ErrorNotification;
7-
import io.split.engine.sse.dtos.FeatureFlagChangeNotification;
9+
import io.split.engine.sse.dtos.CommonChangeNotification;
810
import io.split.engine.sse.dtos.GenericNotificationData;
911
import io.split.engine.sse.dtos.IncomingNotification;
1012
import io.split.engine.sse.dtos.OccupancyNotification;
1113
import io.split.engine.sse.dtos.RawMessageNotification;
1214
import io.split.engine.sse.dtos.SegmentChangeNotification;
1315
import io.split.engine.sse.dtos.SplitKillNotification;
14-
import io.split.engine.sse.dtos.RuleBasedSegmentChangeNotification;
1516
import io.split.engine.sse.exceptions.EventParsingException;
1617

1718
public class NotificationParserImp implements NotificationParser {
@@ -48,9 +49,9 @@ public ErrorNotification parseError(String payload) throws EventParsingException
4849
private IncomingNotification parseNotification(GenericNotificationData genericNotificationData) throws Exception {
4950
switch (genericNotificationData.getType()) {
5051
case SPLIT_UPDATE:
51-
return new FeatureFlagChangeNotification(genericNotificationData);
52+
return new CommonChangeNotification(genericNotificationData, Split.class);
5253
case RB_SEGMENT_UPDATE:
53-
return new RuleBasedSegmentChangeNotification(genericNotificationData);
54+
return new CommonChangeNotification(genericNotificationData, RuleBasedSegment.class);
5455
case SPLIT_KILL:
5556
return new SplitKillNotification(genericNotificationData);
5657
case SEGMENT_UPDATE:

client/src/main/java/io/split/engine/sse/NotificationProcessor.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
11
package io.split.engine.sse;
22

3-
import io.split.engine.sse.dtos.FeatureFlagChangeNotification;
43
import io.split.engine.sse.dtos.IncomingNotification;
54
import io.split.engine.sse.dtos.SplitKillNotification;
65
import io.split.engine.sse.dtos.StatusNotification;
7-
import io.split.engine.sse.dtos.RuleBasedSegmentChangeNotification;
86

97
public interface NotificationProcessor {
108
void process(IncomingNotification notification);
11-
void processSplitUpdate(FeatureFlagChangeNotification featureFlagChangeNotification);
12-
void processRuleBasedSegmentUpdate(RuleBasedSegmentChangeNotification ruleBasedSegmentChangeNotification);
9+
void processUpdates(IncomingNotification notification);
1310
void processSplitKill(SplitKillNotification splitKillNotification);
1411
void processSegmentUpdate(long changeNumber, String segmentName);
1512
void processStatus(StatusNotification statusNotification);

0 commit comments

Comments
 (0)