Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/src/main/java/io/split/client/SplitFactoryImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn

_syncManager = SyncManagerImp.build(splitTasks, _splitFetcher, splitCache, splitAPI,
segmentCache, _gates, _telemetryStorageProducer, _telemetrySynchronizer, config, splitParser,
flagSetsFilter);
ruleBasedSegmentParser, flagSetsFilter, ruleBasedSegmentCache);
_syncManager.start();

// DestroyOnShutDown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void stopPeriodicFetching() {
}

@Override
public void refreshSplits(Long targetChangeNumber) {
public void refreshSplits(Long targetChangeNumber, Long ruleBasedSegmentChangeNumber) {
//No-Op
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void stopPeriodicFetching() {
}

@Override
public void refreshSplits(Long targetChangeNumber) {
public void refreshSplits(Long targetChangeNumber, Long ruleBasedSegmentChangeNumber) {
FetchResult fetchResult = _splitFetcher.forceRefresh(new FetchOptions.Builder().cacheControlHeaders(true).build());
if (fetchResult.isSuccess()){
_log.debug("Refresh feature flags completed");
Expand Down
10 changes: 7 additions & 3 deletions client/src/main/java/io/split/engine/common/PushManagerImp.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.annotations.VisibleForTesting;
import io.split.client.interceptors.FlagSetsFilter;
import io.split.engine.experiments.RuleBasedSegmentParser;
import io.split.engine.experiments.SplitParser;
import io.split.engine.sse.AuthApiClient;
import io.split.engine.sse.AuthApiClientImp;
Expand All @@ -17,6 +18,7 @@
import io.split.engine.sse.workers.FeatureFlagWorkerImp;
import io.split.engine.sse.workers.Worker;

import io.split.storages.RuleBasedSegmentCache;
import io.split.storages.SplitCacheProducer;
import io.split.telemetry.domain.StreamingEvent;
import io.split.telemetry.domain.enums.StreamEventsEnum;
Expand Down Expand Up @@ -79,9 +81,11 @@ public static PushManagerImp build(Synchronizer synchronizer,
ThreadFactory threadFactory,
SplitParser splitParser,
SplitCacheProducer splitCacheProducer,
FlagSetsFilter flagSetsFilter) {
FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(synchronizer, splitParser, splitCacheProducer,
telemetryRuntimeProducer, flagSetsFilter);
FlagSetsFilter flagSetsFilter,
RuleBasedSegmentCache ruleBasedSegmentCache,
RuleBasedSegmentParser ruleBasedSegmentParser) {
FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(synchronizer, splitParser, ruleBasedSegmentParser, splitCacheProducer,
ruleBasedSegmentCache, telemetryRuntimeProducer, flagSetsFilter);
Worker<SegmentQueueDto> segmentWorker = new SegmentsWorkerImp(synchronizer);
PushStatusTracker pushStatusTracker = new PushStatusTrackerImp(statusMessages, telemetryRuntimeProducer);

Expand Down
11 changes: 9 additions & 2 deletions client/src/main/java/io/split/engine/common/SyncManagerImp.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
import io.split.client.SplitClientConfig;
import io.split.client.interceptors.FlagSetsFilter;
import io.split.engine.SDKReadinessGates;
import io.split.engine.experiments.RuleBasedSegmentParser;
import io.split.engine.experiments.SplitFetcher;
import io.split.engine.experiments.SplitParser;
import io.split.engine.experiments.SplitSynchronizationTask;
import io.split.engine.segments.SegmentSynchronizationTask;
import io.split.storages.RuleBasedSegmentCache;
import io.split.storages.SegmentCacheProducer;
import io.split.storages.SplitCacheProducer;
import io.split.telemetry.domain.StreamingEvent;
Expand Down Expand Up @@ -89,12 +91,15 @@ public static SyncManagerImp build(SplitTasks splitTasks,
TelemetrySynchronizer telemetrySynchronizer,
SplitClientConfig config,
SplitParser splitParser,
FlagSetsFilter flagSetsFilter) {
RuleBasedSegmentParser ruleBasedSegmentParser,
FlagSetsFilter flagSetsFilter,
RuleBasedSegmentCache ruleBasedSegmentCache) {
LinkedBlockingQueue<PushManager.Status> pushMessages = new LinkedBlockingQueue<>();
Synchronizer synchronizer = new SynchronizerImp(splitTasks,
splitFetcher,
splitCacheProducer,
segmentCacheProducer,
ruleBasedSegmentCache,
config.streamingRetryDelay(),
config.streamingFetchMaxRetries(),
config.failedAttemptsBeforeLogging(),
Expand All @@ -109,7 +114,9 @@ public static SyncManagerImp build(SplitTasks splitTasks,
config.getThreadFactory(),
splitParser,
splitCacheProducer,
flagSetsFilter);
flagSetsFilter,
ruleBasedSegmentCache,
ruleBasedSegmentParser);

return new SyncManagerImp(splitTasks,
config.streamingEnabled(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ public interface Synchronizer {
boolean syncAll();
void startPeriodicFetching();
void stopPeriodicFetching();
void refreshSplits(Long targetChangeNumber);
void refreshSplits(Long targetChangeNumber, Long ruleBasedSegmentChangeNumber);
void localKillSplit(SplitKillNotification splitKillNotification);
void refreshSegment(String segmentName, Long targetChangeNumber);
void startPeriodicDataRecording();
Expand Down
27 changes: 20 additions & 7 deletions client/src/main/java/io/split/engine/common/SynchronizerImp.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.split.engine.segments.SegmentFetcher;
import io.split.engine.segments.SegmentSynchronizationTask;
import io.split.engine.sse.dtos.SplitKillNotification;
import io.split.storages.RuleBasedSegmentCacheProducer;
import io.split.storages.SegmentCacheProducer;
import io.split.storages.SplitCacheProducer;
import io.split.telemetry.synchronizer.TelemetrySyncTask;
Expand All @@ -34,6 +35,7 @@ public class SynchronizerImp implements Synchronizer {
private final SplitFetcher _splitFetcher;
private final SegmentSynchronizationTask _segmentSynchronizationTaskImp;
private final SplitCacheProducer _splitCacheProducer;
private final RuleBasedSegmentCacheProducer _ruleBasedSegmentCacheProducer;
private final SegmentCacheProducer segmentCacheProducer;
private final ImpressionsManager _impressionManager;
private final EventsTask _eventsTask;
Expand All @@ -48,6 +50,7 @@ public SynchronizerImp(SplitTasks splitTasks,
SplitFetcher splitFetcher,
SplitCacheProducer splitCacheProducer,
SegmentCacheProducer segmentCacheProducer,
RuleBasedSegmentCacheProducer ruleBasedSegmentCacheProducer,
int onDemandFetchRetryDelayMs,
int onDemandFetchMaxRetries,
int failedAttemptsBeforeLogging,
Expand All @@ -56,6 +59,7 @@ public SynchronizerImp(SplitTasks splitTasks,
_splitFetcher = checkNotNull(splitFetcher);
_segmentSynchronizationTaskImp = checkNotNull(splitTasks.getSegmentSynchronizationTask());
_splitCacheProducer = checkNotNull(splitCacheProducer);
_ruleBasedSegmentCacheProducer = checkNotNull(ruleBasedSegmentCacheProducer);
this.segmentCacheProducer = checkNotNull(segmentCacheProducer);
_onDemandFetchRetryDelayMs = checkNotNull(onDemandFetchRetryDelayMs);
_onDemandFetchMaxRetries = onDemandFetchMaxRetries;
Expand Down Expand Up @@ -103,7 +107,7 @@ private static class SyncResult {
private final FetchResult _fetchResult;
}

private SyncResult attemptSplitsSync(long targetChangeNumber,
private SyncResult attemptSplitsSync(long targetChangeNumber, long ruleBasedSegmentChangeNumber,
FetchOptions opts,
Function<Void, Long> nextWaitMs,
int maxRetries) {
Expand All @@ -114,7 +118,8 @@ private SyncResult attemptSplitsSync(long targetChangeNumber,
if (fetchResult != null && !fetchResult.retry() && !fetchResult.isSuccess()) {
return new SyncResult(false, remainingAttempts, fetchResult);
}
if (targetChangeNumber <= _splitCacheProducer.getChangeNumber()) {
if (targetChangeNumber <= _splitCacheProducer.getChangeNumber()
&& ruleBasedSegmentChangeNumber <= _ruleBasedSegmentCacheProducer.getChangeNumber()) {
return new SyncResult(true, remainingAttempts, fetchResult);
} else if (remainingAttempts <= 0) {
return new SyncResult(false, remainingAttempts, fetchResult);
Expand All @@ -130,9 +135,17 @@ private SyncResult attemptSplitsSync(long targetChangeNumber,
}

@Override
public void refreshSplits(Long targetChangeNumber) {
public void refreshSplits(Long targetChangeNumber, Long ruleBasedSegmentChangeNumber) {

if (targetChangeNumber <= _splitCacheProducer.getChangeNumber()) {
if (targetChangeNumber == null || targetChangeNumber == 0) {
targetChangeNumber = _splitCacheProducer.getChangeNumber();
}
if (ruleBasedSegmentChangeNumber == null || ruleBasedSegmentChangeNumber == 0) {
ruleBasedSegmentChangeNumber = _ruleBasedSegmentCacheProducer.getChangeNumber();
}

if (targetChangeNumber <= _splitCacheProducer.getChangeNumber()
&& ruleBasedSegmentChangeNumber <= _ruleBasedSegmentCacheProducer.getChangeNumber()) {
return;
}

Expand All @@ -142,7 +155,7 @@ public void refreshSplits(Long targetChangeNumber) {
.flagSetsFilter(_sets)
.build();

SyncResult regularResult = attemptSplitsSync(targetChangeNumber, opts,
SyncResult regularResult = attemptSplitsSync(targetChangeNumber, ruleBasedSegmentChangeNumber, opts,
(discard) -> (long) _onDemandFetchRetryDelayMs, _onDemandFetchMaxRetries);

int attempts = _onDemandFetchMaxRetries - regularResult.remainingAttempts();
Expand All @@ -157,7 +170,7 @@ public void refreshSplits(Long targetChangeNumber) {
_log.info(String.format("No changes fetched after %s attempts. Will retry bypassing CDN.", attempts));
FetchOptions withCdnBypass = new FetchOptions.Builder(opts).targetChangeNumber(targetChangeNumber).build();
Backoff backoff = new Backoff(ON_DEMAND_FETCH_BACKOFF_BASE_MS, ON_DEMAND_FETCH_BACKOFF_MAX_WAIT_MS);
SyncResult withCDNBypassed = attemptSplitsSync(targetChangeNumber, withCdnBypass,
SyncResult withCDNBypassed = attemptSplitsSync(targetChangeNumber, ruleBasedSegmentChangeNumber, withCdnBypass,
(discard) -> backoff.interval(), ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES);

int withoutCDNAttempts = ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - withCDNBypassed._remainingAttempts;
Expand All @@ -175,7 +188,7 @@ public void localKillSplit(SplitKillNotification splitKillNotification) {
if (splitKillNotification.getChangeNumber() > _splitCacheProducer.getChangeNumber()) {
_splitCacheProducer.kill(splitKillNotification.getSplitName(), splitKillNotification.getDefaultTreatment(),
splitKillNotification.getChangeNumber());
refreshSplits(splitKillNotification.getChangeNumber());
refreshSplits(splitKillNotification.getChangeNumber(), 0L);
}
}

Expand Down
17 changes: 17 additions & 0 deletions client/src/main/java/io/split/engine/experiments/ParsedSplit.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.collect.ImmutableList;
import io.split.engine.matchers.AttributeMatcher;
import io.split.engine.matchers.RuleBasedSegmentMatcher;
import io.split.engine.matchers.UserDefinedSegmentMatcher;

import java.util.HashSet;
Expand Down Expand Up @@ -243,6 +244,15 @@ public Set<String> getSegmentsNames() {
.collect(Collectors.toSet());
}

public Set<String> getRuleBasedSegmentsNames() {
return parsedConditions().stream()
.flatMap(parsedCondition -> parsedCondition.matcher().attributeMatchers().stream())
.filter(ParsedSplit::isRuleBasedSegmentMatcher)
.map(ParsedSplit::asRuleBasedSegmentMatcherForEach)
.map(RuleBasedSegmentMatcher::getSegmentName)
.collect(Collectors.toSet());
}

private static boolean isSegmentMatcher(AttributeMatcher attributeMatcher) {
return ((AttributeMatcher.NegatableMatcher) attributeMatcher.matcher()).delegate() instanceof UserDefinedSegmentMatcher;
}
Expand All @@ -251,4 +261,11 @@ private static UserDefinedSegmentMatcher asSegmentMatcherForEach(AttributeMatche
return (UserDefinedSegmentMatcher) ((AttributeMatcher.NegatableMatcher) attributeMatcher.matcher()).delegate();
}

private static boolean isRuleBasedSegmentMatcher(AttributeMatcher attributeMatcher) {
return ((AttributeMatcher.NegatableMatcher) attributeMatcher.matcher()).delegate() instanceof RuleBasedSegmentMatcher;
}

private static RuleBasedSegmentMatcher asRuleBasedSegmentMatcherForEach(AttributeMatcher attributeMatcher) {
return (RuleBasedSegmentMatcher) ((AttributeMatcher.NegatableMatcher) attributeMatcher.matcher()).delegate();
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package io.split.engine.sse;

import io.split.client.dtos.RuleBasedSegment;
import io.split.client.dtos.Split;
import io.split.client.utils.Json;

import io.split.engine.sse.dtos.ControlNotification;
import io.split.engine.sse.dtos.ErrorNotification;
import io.split.engine.sse.dtos.FeatureFlagChangeNotification;
import io.split.engine.sse.dtos.CommonChangeNotification;
import io.split.engine.sse.dtos.GenericNotificationData;
import io.split.engine.sse.dtos.IncomingNotification;
import io.split.engine.sse.dtos.OccupancyNotification;
Expand Down Expand Up @@ -47,7 +49,9 @@ public ErrorNotification parseError(String payload) throws EventParsingException
private IncomingNotification parseNotification(GenericNotificationData genericNotificationData) throws Exception {
switch (genericNotificationData.getType()) {
case SPLIT_UPDATE:
return new FeatureFlagChangeNotification(genericNotificationData);
return new CommonChangeNotification(genericNotificationData, Split.class);
case RB_SEGMENT_UPDATE:
return new CommonChangeNotification(genericNotificationData, RuleBasedSegment.class);
case SPLIT_KILL:
return new SplitKillNotification(genericNotificationData);
case SEGMENT_UPDATE:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package io.split.engine.sse;

import io.split.engine.sse.dtos.FeatureFlagChangeNotification;
import io.split.engine.sse.dtos.IncomingNotification;
import io.split.engine.sse.dtos.SplitKillNotification;
import io.split.engine.sse.dtos.StatusNotification;

public interface NotificationProcessor {
void process(IncomingNotification notification);
void processSplitUpdate(FeatureFlagChangeNotification featureFlagChangeNotification);
void processUpdates(IncomingNotification notification);
void processSplitKill(SplitKillNotification splitKillNotification);
void processSegmentUpdate(long changeNumber, String segmentName);
void processStatus(StatusNotification statusNotification);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.split.engine.sse;

import com.google.common.annotations.VisibleForTesting;
import io.split.engine.sse.dtos.FeatureFlagChangeNotification;
import io.split.engine.sse.dtos.GenericNotificationData;
import io.split.engine.sse.dtos.IncomingNotification;
import io.split.engine.sse.dtos.SplitKillNotification;
Expand Down Expand Up @@ -31,20 +30,19 @@ public static NotificationProcessorImp build(FeatureFlagsWorker featureFlagsWork
return new NotificationProcessorImp(featureFlagsWorker, segmentWorker, pushStatusTracker);
}

@Override
public void process(IncomingNotification notification) {
notification.handler(this);
public void processUpdates(IncomingNotification notification) {
_featureFlagsWorker.addToQueue(notification);
}

@Override
public void processSplitUpdate(FeatureFlagChangeNotification featureFlagChangeNotification) {
_featureFlagsWorker.addToQueue(featureFlagChangeNotification);
public void process(IncomingNotification notification) {
notification.handler(this);
}

@Override
public void processSplitKill(SplitKillNotification splitKillNotification) {
_featureFlagsWorker.kill(splitKillNotification);
_featureFlagsWorker.addToQueue(new FeatureFlagChangeNotification(GenericNotificationData.builder()
_featureFlagsWorker.addToQueue(new SplitKillNotification(GenericNotificationData.builder()
.changeNumber(splitKillNotification.getChangeNumber())
.channel(splitKillNotification.getChannel())
.build()));
Expand Down
Loading