Skip to content

Commit 6e12ff2

Browse files
authored
Merge branch 'rbs-localhost' into rbs-oldspec-fetcher
2 parents 9117c38 + f63699d commit 6e12ff2

28 files changed

+566
-256
lines changed

client/src/main/java/io/split/client/JsonLocalhostSplitChangeFetcher.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package io.split.client;
22

33
import com.google.gson.stream.JsonReader;
4-
import io.split.client.dtos.ChangeDto;
54
import io.split.client.dtos.SplitChange;
65
import io.split.client.utils.InputStreamProvider;
76
import io.split.client.utils.Json;
@@ -17,9 +16,10 @@
1716
import java.nio.charset.StandardCharsets;
1817
import java.security.MessageDigest;
1918
import java.security.NoSuchAlgorithmException;
20-
import java.util.ArrayList;
2119
import java.util.Arrays;
2220

21+
import static io.split.client.utils.Utils.checkExitConditions;
22+
2323
public class JsonLocalhostSplitChangeFetcher implements SplitChangeFetcher {
2424

2525
private static final Logger _log = LoggerFactory.getLogger(JsonLocalhostSplitChangeFetcher.class);
@@ -70,10 +70,6 @@ private SplitChange processSplitChange(SplitChange splitChange, long changeNumbe
7070
return splitChangeToProcess;
7171
}
7272

73-
private <T> boolean checkExitConditions(ChangeDto<T> change, long cn) {
74-
return change.t < cn && change.t != -1;
75-
}
76-
7773
private byte[] getStringDigest(String Json) throws NoSuchAlgorithmException {
7874
MessageDigest digest = MessageDigest.getInstance("SHA-1");
7975
digest.reset();

client/src/main/java/io/split/client/SplitFactoryImpl.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,6 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
220220
// Segments
221221
_segmentSynchronizationTaskImp = buildSegments(config, segmentCache, splitCache);
222222

223-
224223
SplitParser splitParser = new SplitParser();
225224
RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser();
226225
// SplitFetcher

client/src/main/java/io/split/client/utils/Utils.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.split.client.utils;
22

3+
import io.split.client.dtos.ChangeDto;
34
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
45
import org.apache.hc.core5.http.ContentType;
56
import org.apache.hc.core5.http.HttpEntity;
@@ -40,4 +41,8 @@ public static URI appendPath(URI root, String pathToAppend) throws URISyntaxExce
4041
String path = String.format("%s%s%s", root.getPath(), root.getPath().endsWith("/") ? "" : "/", pathToAppend);
4142
return new URIBuilder(root).setPath(path).build();
4243
}
44+
45+
public static <T> boolean checkExitConditions(ChangeDto<T> change, long cn) {
46+
return change.t < cn && change.t != -1;
47+
}
4348
}

client/src/main/java/io/split/engine/common/SynchronizerImp.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,8 @@ private SyncResult attemptSplitsSync(long targetChangeNumber, long ruleBasedSegm
118118
if (fetchResult != null && !fetchResult.retry() && !fetchResult.isSuccess()) {
119119
return new SyncResult(false, remainingAttempts, fetchResult);
120120
}
121-
if ((targetChangeNumber != 0 && targetChangeNumber <= _splitCacheProducer.getChangeNumber()) ||
122-
(ruleBasedSegmentChangeNumber != 0 && ruleBasedSegmentChangeNumber <= _ruleBasedSegmentCacheProducer.getChangeNumber())) {
121+
if (targetChangeNumber <= _splitCacheProducer.getChangeNumber()
122+
&& ruleBasedSegmentChangeNumber <= _ruleBasedSegmentCacheProducer.getChangeNumber()) {
123123
return new SyncResult(true, remainingAttempts, fetchResult);
124124
} else if (remainingAttempts <= 0) {
125125
return new SyncResult(false, remainingAttempts, fetchResult);
@@ -137,9 +137,15 @@ private SyncResult attemptSplitsSync(long targetChangeNumber, long ruleBasedSegm
137137
@Override
138138
public void refreshSplits(Long targetChangeNumber, Long ruleBasedSegmentChangeNumber) {
139139

140-
if ((targetChangeNumber != 0 && targetChangeNumber <= _splitCacheProducer.getChangeNumber()) ||
141-
(ruleBasedSegmentChangeNumber != 0 && ruleBasedSegmentChangeNumber <= _ruleBasedSegmentCacheProducer.getChangeNumber()) ||
142-
(ruleBasedSegmentChangeNumber == 0 && targetChangeNumber == 0)) {
140+
if (targetChangeNumber == null || targetChangeNumber == 0) {
141+
targetChangeNumber = _splitCacheProducer.getChangeNumber();
142+
}
143+
if (ruleBasedSegmentChangeNumber == null || ruleBasedSegmentChangeNumber == 0) {
144+
ruleBasedSegmentChangeNumber = _ruleBasedSegmentCacheProducer.getChangeNumber();
145+
}
146+
147+
if (targetChangeNumber <= _splitCacheProducer.getChangeNumber()
148+
&& ruleBasedSegmentChangeNumber <= _ruleBasedSegmentCacheProducer.getChangeNumber()) {
143149
return;
144150
}
145151

client/src/main/java/io/split/engine/experiments/SplitFetcherImp.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
package io.split.engine.experiments;
22

3-
import io.split.client.dtos.ChangeDto;
4-
import io.split.client.dtos.RuleBasedSegment;
5-
import io.split.client.dtos.Split;
63
import io.split.client.dtos.SplitChange;
74
import io.split.client.exceptions.UriTooLongException;
85
import io.split.client.interceptors.FlagSetsFilter;
@@ -22,6 +19,7 @@
2219
import static com.google.common.base.Preconditions.checkNotNull;
2320
import static io.split.client.utils.FeatureFlagProcessor.processFeatureFlagChanges;
2421
import static io.split.client.utils.RuleBasedSegmentProcessor.processRuleBasedSegmentChanges;
22+
import static io.split.client.utils.Utils.checkExitConditions;
2523

2624
/**
2725
* An ExperimentFetcher that refreshes experiment definitions periodically.
@@ -144,7 +142,7 @@ private Set<String> runWithoutExceptionHandling(FetchOptions options) throws Int
144142

145143
synchronized (_lock) {
146144
// check state one more time.
147-
if (checkExitConditions(change.featureFlags, _splitCacheProducer.getChangeNumber()) &&
145+
if (checkExitConditions(change.featureFlags, _splitCacheProducer.getChangeNumber()) ||
148146
checkExitConditions(change.ruleBasedSegments, _ruleBasedSegmentCacheProducer.getChangeNumber())) {
149147
// some other thread may have updated the shared state. exit
150148
return segments;
@@ -163,8 +161,4 @@ private Set<String> runWithoutExceptionHandling(FetchOptions options) throws Int
163161

164162
return segments;
165163
}
166-
167-
private <T> boolean checkExitConditions(ChangeDto<T> change, long cn) {
168-
return change.s != cn || change.t < cn;
169-
}
170164
}

client/src/main/java/io/split/engine/sse/NotificationParserImp.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
11
package io.split.engine.sse;
22

3+
import io.split.client.dtos.RuleBasedSegment;
4+
import io.split.client.dtos.Split;
35
import io.split.client.utils.Json;
46

57
import io.split.engine.sse.dtos.ControlNotification;
68
import io.split.engine.sse.dtos.ErrorNotification;
7-
import io.split.engine.sse.dtos.FeatureFlagChangeNotification;
9+
import io.split.engine.sse.dtos.CommonChangeNotification;
810
import io.split.engine.sse.dtos.GenericNotificationData;
911
import io.split.engine.sse.dtos.IncomingNotification;
1012
import io.split.engine.sse.dtos.OccupancyNotification;
1113
import io.split.engine.sse.dtos.RawMessageNotification;
1214
import io.split.engine.sse.dtos.SegmentChangeNotification;
1315
import io.split.engine.sse.dtos.SplitKillNotification;
14-
import io.split.engine.sse.dtos.RuleBasedSegmentChangeNotification;
1516
import io.split.engine.sse.exceptions.EventParsingException;
1617

1718
public class NotificationParserImp implements NotificationParser {
@@ -48,9 +49,9 @@ public ErrorNotification parseError(String payload) throws EventParsingException
4849
private IncomingNotification parseNotification(GenericNotificationData genericNotificationData) throws Exception {
4950
switch (genericNotificationData.getType()) {
5051
case SPLIT_UPDATE:
51-
return new FeatureFlagChangeNotification(genericNotificationData);
52+
return new CommonChangeNotification(genericNotificationData, Split.class);
5253
case RB_SEGMENT_UPDATE:
53-
return new RuleBasedSegmentChangeNotification(genericNotificationData);
54+
return new CommonChangeNotification(genericNotificationData, RuleBasedSegment.class);
5455
case SPLIT_KILL:
5556
return new SplitKillNotification(genericNotificationData);
5657
case SEGMENT_UPDATE:

client/src/main/java/io/split/engine/sse/NotificationProcessor.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
11
package io.split.engine.sse;
22

3-
import io.split.engine.sse.dtos.FeatureFlagChangeNotification;
43
import io.split.engine.sse.dtos.IncomingNotification;
54
import io.split.engine.sse.dtos.SplitKillNotification;
65
import io.split.engine.sse.dtos.StatusNotification;
7-
import io.split.engine.sse.dtos.RuleBasedSegmentChangeNotification;
86

97
public interface NotificationProcessor {
108
void process(IncomingNotification notification);
11-
void processSplitUpdate(FeatureFlagChangeNotification featureFlagChangeNotification);
12-
void processRuleBasedSegmentUpdate(RuleBasedSegmentChangeNotification ruleBasedSegmentChangeNotification);
9+
void processUpdates(IncomingNotification notification);
1310
void processSplitKill(SplitKillNotification splitKillNotification);
1411
void processSegmentUpdate(long changeNumber, String segmentName);
1512
void processStatus(StatusNotification statusNotification);

client/src/main/java/io/split/engine/sse/NotificationProcessorImp.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
package io.split.engine.sse;
22

33
import com.google.common.annotations.VisibleForTesting;
4-
import io.split.engine.sse.dtos.FeatureFlagChangeNotification;
54
import io.split.engine.sse.dtos.GenericNotificationData;
65
import io.split.engine.sse.dtos.IncomingNotification;
76
import io.split.engine.sse.dtos.SplitKillNotification;
87
import io.split.engine.sse.dtos.StatusNotification;
98
import io.split.engine.sse.dtos.SegmentQueueDto;
10-
import io.split.engine.sse.dtos.RuleBasedSegmentChangeNotification;
119
import io.split.engine.sse.workers.FeatureFlagsWorker;
1210
import io.split.engine.sse.workers.Worker;
1311

@@ -32,25 +30,19 @@ public static NotificationProcessorImp build(FeatureFlagsWorker featureFlagsWork
3230
return new NotificationProcessorImp(featureFlagsWorker, segmentWorker, pushStatusTracker);
3331
}
3432

35-
@Override
36-
public void process(IncomingNotification notification) {
37-
notification.handler(this);
38-
}
39-
40-
@Override
41-
public void processSplitUpdate(FeatureFlagChangeNotification featureFlagChangeNotification) {
42-
_featureFlagsWorker.addToQueue(featureFlagChangeNotification);
33+
public void processUpdates(IncomingNotification notification) {
34+
_featureFlagsWorker.addToQueue(notification);
4335
}
4436

4537
@Override
46-
public void processRuleBasedSegmentUpdate(RuleBasedSegmentChangeNotification ruleBasedSegmentChangeNotification) {
47-
_featureFlagsWorker.addToQueue(ruleBasedSegmentChangeNotification);
38+
public void process(IncomingNotification notification) {
39+
notification.handler(this);
4840
}
4941

5042
@Override
5143
public void processSplitKill(SplitKillNotification splitKillNotification) {
5244
_featureFlagsWorker.kill(splitKillNotification);
53-
_featureFlagsWorker.addToQueue(new FeatureFlagChangeNotification(GenericNotificationData.builder()
45+
_featureFlagsWorker.addToQueue(new SplitKillNotification(GenericNotificationData.builder()
5446
.changeNumber(splitKillNotification.getChangeNumber())
5547
.channel(splitKillNotification.getChannel())
5648
.build()));

client/src/main/java/io/split/engine/sse/dtos/CommonChangeNotification.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.split.engine.sse.dtos;
22

3+
import io.split.client.utils.Json;
34
import io.split.engine.segments.SegmentSynchronizationTaskImp;
45
import io.split.engine.sse.NotificationProcessor;
56
import io.split.engine.sse.enums.CompressType;
@@ -14,24 +15,29 @@
1415
import static io.split.engine.sse.utils.DecompressionUtil.gZipDecompress;
1516
import static io.split.engine.sse.utils.DecompressionUtil.zLibDecompress;
1617

17-
public class CommonChangeNotification extends IncomingNotification {
18+
public class CommonChangeNotification<Y> extends IncomingNotification {
1819
private static final Logger _log = LoggerFactory.getLogger(SegmentSynchronizationTaskImp.class);
1920
private final long changeNumber;
2021
private long previousChangeNumber;
2122
private CompressType compressType;
23+
private Y definition;
24+
private Class _definitionClass;
2225

23-
public CommonChangeNotification(GenericNotificationData genericNotificationData, IncomingNotification.Type notificationType) {
24-
super(notificationType, genericNotificationData.getChannel());
26+
public CommonChangeNotification(GenericNotificationData genericNotificationData,
27+
Class definitionClass) {
28+
super(genericNotificationData.getType(), genericNotificationData.getChannel());
2529
changeNumber = genericNotificationData.getChangeNumber();
30+
_definitionClass = definitionClass;
31+
2632
if(genericNotificationData.getPreviousChangeNumber() != null) {
2733
previousChangeNumber = genericNotificationData.getPreviousChangeNumber();
2834
}
2935
compressType = CompressType.from(genericNotificationData.getCompressType());
30-
if (compressType == null || genericNotificationData.getFeatureFlagDefinition() == null) {
36+
if (compressType == null || genericNotificationData.getDefinition() == null) {
3137
return;
3238
}
3339
try {
34-
byte[] decodedBytes = Base64.getDecoder().decode(genericNotificationData.getFeatureFlagDefinition());
40+
byte[] decodedBytes = Base64.getDecoder().decode(genericNotificationData.getDefinition());
3541
switch (compressType) {
3642
case GZIP:
3743
decodedBytes = gZipDecompress(decodedBytes);
@@ -57,18 +63,25 @@ public long getChangeNumber() {
5763
public long getPreviousChangeNumber() {
5864
return previousChangeNumber;
5965
}
60-
6166
public CompressType getCompressType() {
6267
return compressType;
6368
}
6469

70+
public Y getDefinition() {
71+
return definition;
72+
}
73+
6574
@Override
66-
public void handler(NotificationProcessor notificationProcessor) {}
75+
public void handler(NotificationProcessor notificationProcessor) {
76+
notificationProcessor.processUpdates(this);
77+
}
6778

6879
@Override
6980
public String toString() {
7081
return String.format("Type: %s; Channel: %s; ChangeNumber: %s", getType(), getChannel(), getChangeNumber());
7182
}
7283

73-
public void updateDefinition(byte[] decodedBytes) throws UnsupportedEncodingException {};
84+
private void updateDefinition(byte[] decodedBytes) throws UnsupportedEncodingException {
85+
definition = (Y) Json.fromJson(new String(decodedBytes, "UTF-8"), _definitionClass);
86+
}
7487
}

client/src/main/java/io/split/engine/sse/dtos/FeatureFlagChangeNotification.java

Lines changed: 0 additions & 29 deletions
This file was deleted.

0 commit comments

Comments
 (0)