Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/src/main/java/io/split/client/SplitFactoryImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn

_syncManager = SyncManagerImp.build(splitTasks, _splitFetcher, splitCache, splitAPI,
segmentCache, _gates, _telemetryStorageProducer, _telemetrySynchronizer, config, splitParser,
flagSetsFilter);
ruleBasedSegmentParser, flagSetsFilter, ruleBasedSegmentCache);
_syncManager.start();

// DestroyOnShutDown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void stopPeriodicFetching() {
}

@Override
public void refreshSplits(Long targetChangeNumber) {
public void refreshSplits(Long targetChangeNumber, Long ruleBasedSegmentChangeNumber) {
//No-Op
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void stopPeriodicFetching() {
}

@Override
public void refreshSplits(Long targetChangeNumber) {
public void refreshSplits(Long targetChangeNumber, Long ruleBasedSegmentChangeNumber) {
FetchResult fetchResult = _splitFetcher.forceRefresh(new FetchOptions.Builder().cacheControlHeaders(true).build());
if (fetchResult.isSuccess()){
_log.debug("Refresh feature flags completed");
Expand Down
10 changes: 7 additions & 3 deletions client/src/main/java/io/split/engine/common/PushManagerImp.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.annotations.VisibleForTesting;
import io.split.client.interceptors.FlagSetsFilter;
import io.split.engine.experiments.RuleBasedSegmentParser;
import io.split.engine.experiments.SplitParser;
import io.split.engine.sse.AuthApiClient;
import io.split.engine.sse.AuthApiClientImp;
Expand All @@ -17,6 +18,7 @@
import io.split.engine.sse.workers.FeatureFlagWorkerImp;
import io.split.engine.sse.workers.Worker;

import io.split.storages.RuleBasedSegmentCache;
import io.split.storages.SplitCacheProducer;
import io.split.telemetry.domain.StreamingEvent;
import io.split.telemetry.domain.enums.StreamEventsEnum;
Expand Down Expand Up @@ -79,9 +81,11 @@ public static PushManagerImp build(Synchronizer synchronizer,
ThreadFactory threadFactory,
SplitParser splitParser,
SplitCacheProducer splitCacheProducer,
FlagSetsFilter flagSetsFilter) {
FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(synchronizer, splitParser, splitCacheProducer,
telemetryRuntimeProducer, flagSetsFilter);
FlagSetsFilter flagSetsFilter,
RuleBasedSegmentCache ruleBasedSegmentCache,
RuleBasedSegmentParser ruleBasedSegmentParser) {
FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(synchronizer, splitParser, ruleBasedSegmentParser, splitCacheProducer,
ruleBasedSegmentCache, telemetryRuntimeProducer, flagSetsFilter);
Worker<SegmentQueueDto> segmentWorker = new SegmentsWorkerImp(synchronizer);
PushStatusTracker pushStatusTracker = new PushStatusTrackerImp(statusMessages, telemetryRuntimeProducer);

Expand Down
11 changes: 9 additions & 2 deletions client/src/main/java/io/split/engine/common/SyncManagerImp.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
import io.split.client.SplitClientConfig;
import io.split.client.interceptors.FlagSetsFilter;
import io.split.engine.SDKReadinessGates;
import io.split.engine.experiments.RuleBasedSegmentParser;
import io.split.engine.experiments.SplitFetcher;
import io.split.engine.experiments.SplitParser;
import io.split.engine.experiments.SplitSynchronizationTask;
import io.split.engine.segments.SegmentSynchronizationTask;
import io.split.storages.RuleBasedSegmentCache;
import io.split.storages.SegmentCacheProducer;
import io.split.storages.SplitCacheProducer;
import io.split.telemetry.domain.StreamingEvent;
Expand Down Expand Up @@ -89,12 +91,15 @@ public static SyncManagerImp build(SplitTasks splitTasks,
TelemetrySynchronizer telemetrySynchronizer,
SplitClientConfig config,
SplitParser splitParser,
FlagSetsFilter flagSetsFilter) {
RuleBasedSegmentParser ruleBasedSegmentParser,
FlagSetsFilter flagSetsFilter,
RuleBasedSegmentCache ruleBasedSegmentCache) {
LinkedBlockingQueue<PushManager.Status> pushMessages = new LinkedBlockingQueue<>();
Synchronizer synchronizer = new SynchronizerImp(splitTasks,
splitFetcher,
splitCacheProducer,
segmentCacheProducer,
ruleBasedSegmentCache,
config.streamingRetryDelay(),
config.streamingFetchMaxRetries(),
config.failedAttemptsBeforeLogging(),
Expand All @@ -109,7 +114,9 @@ public static SyncManagerImp build(SplitTasks splitTasks,
config.getThreadFactory(),
splitParser,
splitCacheProducer,
flagSetsFilter);
flagSetsFilter,
ruleBasedSegmentCache,
ruleBasedSegmentParser);

return new SyncManagerImp(splitTasks,
config.streamingEnabled(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ public interface Synchronizer {
boolean syncAll();
void startPeriodicFetching();
void stopPeriodicFetching();
void refreshSplits(Long targetChangeNumber);
void refreshSplits(Long targetChangeNumber, Long ruleBasedSegmentChangeNumber);
void localKillSplit(SplitKillNotification splitKillNotification);
void refreshSegment(String segmentName, Long targetChangeNumber);
void startPeriodicDataRecording();
Expand Down
27 changes: 20 additions & 7 deletions client/src/main/java/io/split/engine/common/SynchronizerImp.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.split.engine.segments.SegmentFetcher;
import io.split.engine.segments.SegmentSynchronizationTask;
import io.split.engine.sse.dtos.SplitKillNotification;
import io.split.storages.RuleBasedSegmentCacheProducer;
import io.split.storages.SegmentCacheProducer;
import io.split.storages.SplitCacheProducer;
import io.split.telemetry.synchronizer.TelemetrySyncTask;
Expand All @@ -34,6 +35,7 @@ public class SynchronizerImp implements Synchronizer {
private final SplitFetcher _splitFetcher;
private final SegmentSynchronizationTask _segmentSynchronizationTaskImp;
private final SplitCacheProducer _splitCacheProducer;
private final RuleBasedSegmentCacheProducer _ruleBasedSegmentCacheProducer;
private final SegmentCacheProducer segmentCacheProducer;
private final ImpressionsManager _impressionManager;
private final EventsTask _eventsTask;
Expand All @@ -48,6 +50,7 @@ public SynchronizerImp(SplitTasks splitTasks,
SplitFetcher splitFetcher,
SplitCacheProducer splitCacheProducer,
SegmentCacheProducer segmentCacheProducer,
RuleBasedSegmentCacheProducer ruleBasedSegmentCacheProducer,
int onDemandFetchRetryDelayMs,
int onDemandFetchMaxRetries,
int failedAttemptsBeforeLogging,
Expand All @@ -56,6 +59,7 @@ public SynchronizerImp(SplitTasks splitTasks,
_splitFetcher = checkNotNull(splitFetcher);
_segmentSynchronizationTaskImp = checkNotNull(splitTasks.getSegmentSynchronizationTask());
_splitCacheProducer = checkNotNull(splitCacheProducer);
_ruleBasedSegmentCacheProducer = checkNotNull(ruleBasedSegmentCacheProducer);
this.segmentCacheProducer = checkNotNull(segmentCacheProducer);
_onDemandFetchRetryDelayMs = checkNotNull(onDemandFetchRetryDelayMs);
_onDemandFetchMaxRetries = onDemandFetchMaxRetries;
Expand Down Expand Up @@ -103,7 +107,7 @@ private static class SyncResult {
private final FetchResult _fetchResult;
}

private SyncResult attemptSplitsSync(long targetChangeNumber,
private SyncResult attemptSplitsSync(long targetChangeNumber, long ruleBasedSegmentChangeNumber,
FetchOptions opts,
Function<Void, Long> nextWaitMs,
int maxRetries) {
Expand All @@ -114,7 +118,8 @@ private SyncResult attemptSplitsSync(long targetChangeNumber,
if (fetchResult != null && !fetchResult.retry() && !fetchResult.isSuccess()) {
return new SyncResult(false, remainingAttempts, fetchResult);
}
if (targetChangeNumber <= _splitCacheProducer.getChangeNumber()) {
if (targetChangeNumber <= _splitCacheProducer.getChangeNumber()
&& ruleBasedSegmentChangeNumber <= _ruleBasedSegmentCacheProducer.getChangeNumber()) {
return new SyncResult(true, remainingAttempts, fetchResult);
} else if (remainingAttempts <= 0) {
return new SyncResult(false, remainingAttempts, fetchResult);
Expand All @@ -130,9 +135,17 @@ private SyncResult attemptSplitsSync(long targetChangeNumber,
}

@Override
public void refreshSplits(Long targetChangeNumber) {
public void refreshSplits(Long targetChangeNumber, Long ruleBasedSegmentChangeNumber) {

if (targetChangeNumber <= _splitCacheProducer.getChangeNumber()) {
if (targetChangeNumber == null || targetChangeNumber == 0) {
targetChangeNumber = _splitCacheProducer.getChangeNumber();
}
if (ruleBasedSegmentChangeNumber == null || ruleBasedSegmentChangeNumber == 0) {
ruleBasedSegmentChangeNumber = _ruleBasedSegmentCacheProducer.getChangeNumber();
}

if (targetChangeNumber <= _splitCacheProducer.getChangeNumber()
&& ruleBasedSegmentChangeNumber <= _ruleBasedSegmentCacheProducer.getChangeNumber()) {
return;
}

Expand All @@ -142,7 +155,7 @@ public void refreshSplits(Long targetChangeNumber) {
.flagSetsFilter(_sets)
.build();

SyncResult regularResult = attemptSplitsSync(targetChangeNumber, opts,
SyncResult regularResult = attemptSplitsSync(targetChangeNumber, ruleBasedSegmentChangeNumber, opts,
(discard) -> (long) _onDemandFetchRetryDelayMs, _onDemandFetchMaxRetries);

int attempts = _onDemandFetchMaxRetries - regularResult.remainingAttempts();
Expand All @@ -157,7 +170,7 @@ public void refreshSplits(Long targetChangeNumber) {
_log.info(String.format("No changes fetched after %s attempts. Will retry bypassing CDN.", attempts));
FetchOptions withCdnBypass = new FetchOptions.Builder(opts).targetChangeNumber(targetChangeNumber).build();
Backoff backoff = new Backoff(ON_DEMAND_FETCH_BACKOFF_BASE_MS, ON_DEMAND_FETCH_BACKOFF_MAX_WAIT_MS);
SyncResult withCDNBypassed = attemptSplitsSync(targetChangeNumber, withCdnBypass,
SyncResult withCDNBypassed = attemptSplitsSync(targetChangeNumber, ruleBasedSegmentChangeNumber, withCdnBypass,
(discard) -> backoff.interval(), ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES);

int withoutCDNAttempts = ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - withCDNBypassed._remainingAttempts;
Expand All @@ -175,7 +188,7 @@ public void localKillSplit(SplitKillNotification splitKillNotification) {
if (splitKillNotification.getChangeNumber() > _splitCacheProducer.getChangeNumber()) {
_splitCacheProducer.kill(splitKillNotification.getSplitName(), splitKillNotification.getDefaultTreatment(),
splitKillNotification.getChangeNumber());
refreshSplits(splitKillNotification.getChangeNumber());
refreshSplits(splitKillNotification.getChangeNumber(), 0L);
}
}

Expand Down
17 changes: 17 additions & 0 deletions client/src/main/java/io/split/engine/experiments/ParsedSplit.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.collect.ImmutableList;
import io.split.engine.matchers.AttributeMatcher;
import io.split.engine.matchers.RuleBasedSegmentMatcher;
import io.split.engine.matchers.UserDefinedSegmentMatcher;

import java.util.HashSet;
Expand Down Expand Up @@ -243,6 +244,15 @@ public Set<String> getSegmentsNames() {
.collect(Collectors.toSet());
}

public Set<String> getRuleBasedSegmentsNames() {
return parsedConditions().stream()
.flatMap(parsedCondition -> parsedCondition.matcher().attributeMatchers().stream())
.filter(ParsedSplit::isRuleBasedSegmentMatcher)
.map(ParsedSplit::asRuleBasedSegmentMatcherForEach)
.map(RuleBasedSegmentMatcher::getSegmentName)
.collect(Collectors.toSet());
}

private static boolean isSegmentMatcher(AttributeMatcher attributeMatcher) {
return ((AttributeMatcher.NegatableMatcher) attributeMatcher.matcher()).delegate() instanceof UserDefinedSegmentMatcher;
}
Expand All @@ -251,4 +261,11 @@ private static UserDefinedSegmentMatcher asSegmentMatcherForEach(AttributeMatche
return (UserDefinedSegmentMatcher) ((AttributeMatcher.NegatableMatcher) attributeMatcher.matcher()).delegate();
}

private static boolean isRuleBasedSegmentMatcher(AttributeMatcher attributeMatcher) {
return ((AttributeMatcher.NegatableMatcher) attributeMatcher.matcher()).delegate() instanceof RuleBasedSegmentMatcher;
}

private static RuleBasedSegmentMatcher asRuleBasedSegmentMatcherForEach(AttributeMatcher attributeMatcher) {
return (RuleBasedSegmentMatcher) ((AttributeMatcher.NegatableMatcher) attributeMatcher.matcher()).delegate();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.split.engine.sse.dtos.RawMessageNotification;
import io.split.engine.sse.dtos.SegmentChangeNotification;
import io.split.engine.sse.dtos.SplitKillNotification;
import io.split.engine.sse.dtos.RuleBasedSegmentChangeNotification;
import io.split.engine.sse.exceptions.EventParsingException;

public class NotificationParserImp implements NotificationParser {
Expand Down Expand Up @@ -48,6 +49,8 @@ private IncomingNotification parseNotification(GenericNotificationData genericNo
switch (genericNotificationData.getType()) {
case SPLIT_UPDATE:
return new FeatureFlagChangeNotification(genericNotificationData);
case RB_SEGMENT_UPDATE:
return new RuleBasedSegmentChangeNotification(genericNotificationData);
case SPLIT_KILL:
return new SplitKillNotification(genericNotificationData);
case SEGMENT_UPDATE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import io.split.engine.sse.dtos.IncomingNotification;
import io.split.engine.sse.dtos.SplitKillNotification;
import io.split.engine.sse.dtos.StatusNotification;
import io.split.engine.sse.dtos.RuleBasedSegmentChangeNotification;

public interface NotificationProcessor {
void process(IncomingNotification notification);
void processSplitUpdate(FeatureFlagChangeNotification featureFlagChangeNotification);
void processRuleBasedSegmentUpdate(RuleBasedSegmentChangeNotification ruleBasedSegmentChangeNotification);
void processSplitKill(SplitKillNotification splitKillNotification);
void processSegmentUpdate(long changeNumber, String segmentName);
void processStatus(StatusNotification statusNotification);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.split.engine.sse.dtos.SplitKillNotification;
import io.split.engine.sse.dtos.StatusNotification;
import io.split.engine.sse.dtos.SegmentQueueDto;
import io.split.engine.sse.dtos.RuleBasedSegmentChangeNotification;
import io.split.engine.sse.workers.FeatureFlagsWorker;
import io.split.engine.sse.workers.Worker;

Expand Down Expand Up @@ -41,6 +42,11 @@ public void processSplitUpdate(FeatureFlagChangeNotification featureFlagChangeNo
_featureFlagsWorker.addToQueue(featureFlagChangeNotification);
}

@Override
public void processRuleBasedSegmentUpdate(RuleBasedSegmentChangeNotification ruleBasedSegmentChangeNotification) {
_featureFlagsWorker.addToQueue(ruleBasedSegmentChangeNotification);
}

@Override
public void processSplitKill(SplitKillNotification splitKillNotification) {
_featureFlagsWorker.kill(splitKillNotification);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package io.split.engine.sse.dtos;

import io.split.engine.segments.SegmentSynchronizationTaskImp;
import io.split.engine.sse.NotificationProcessor;
import io.split.engine.sse.enums.CompressType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Base64;
import java.util.zip.DataFormatException;

import static io.split.engine.sse.utils.DecompressionUtil.gZipDecompress;
import static io.split.engine.sse.utils.DecompressionUtil.zLibDecompress;

public class CommonChangeNotification extends IncomingNotification {
private static final Logger _log = LoggerFactory.getLogger(SegmentSynchronizationTaskImp.class);
private final long changeNumber;
private long previousChangeNumber;
private CompressType compressType;

public CommonChangeNotification(GenericNotificationData genericNotificationData, IncomingNotification.Type notificationType) {
super(notificationType, genericNotificationData.getChannel());
changeNumber = genericNotificationData.getChangeNumber();
if(genericNotificationData.getPreviousChangeNumber() != null) {
previousChangeNumber = genericNotificationData.getPreviousChangeNumber();
}
compressType = CompressType.from(genericNotificationData.getCompressType());
if (compressType == null || genericNotificationData.getFeatureFlagDefinition() == null) {
return;
}
try {
byte[] decodedBytes = Base64.getDecoder().decode(genericNotificationData.getFeatureFlagDefinition());
switch (compressType) {
case GZIP:
decodedBytes = gZipDecompress(decodedBytes);
break;
case ZLIB:
decodedBytes = zLibDecompress(decodedBytes);
break;
}

updateDefinition(decodedBytes);
} catch (UnsupportedEncodingException | IllegalArgumentException e) {
_log.warn("Could not decode base64 data in definition", e);
} catch (DataFormatException d) {
_log.warn("Could not decompress definition with zlib algorithm", d);
} catch (IOException i) {
_log.warn("Could not decompress definition with gzip algorithm", i);
}
}

public long getChangeNumber() {
return changeNumber;
}
public long getPreviousChangeNumber() {
return previousChangeNumber;
}

public CompressType getCompressType() {
return compressType;
}

@Override
public void handler(NotificationProcessor notificationProcessor) {}

@Override
public String toString() {
return String.format("Type: %s; Channel: %s; ChangeNumber: %s", getType(), getChannel(), getChangeNumber());
}

public void updateDefinition(byte[] decodedBytes) throws UnsupportedEncodingException {};
}
Loading