Skip to content
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.split.client.impressions;

import com.google.common.collect.Lists;
import io.split.client.dtos.UniqueKeys;
import io.split.client.impressions.filters.BloomFilterImp;
import io.split.client.impressions.filters.Filter;
Expand All @@ -25,8 +26,9 @@
public class UniqueKeysTrackerImp implements UniqueKeysTracker{
private static final Logger _log = LoggerFactory.getLogger(UniqueKeysTrackerImp.class);
private static final double MARGIN_ERROR = 0.01;
private static final int MAX_AMOUNT_OF_TRACKED_UNIQUE_KEYS = 30000;
private static final int MAX_UNIQUE_KEYS_POST_SIZE = 5000;
private static final int MAX_AMOUNT_OF_KEYS = 10000000;
private int trackerKeysSize = 0;
private FilterAdapter filterAdapter;
private final TelemetrySynchronizer _telemetrySynchronizer;
private final ScheduledExecutorService _uniqueKeysSyncScheduledExecutorService;
Expand Down Expand Up @@ -59,10 +61,11 @@ public boolean track(String featureFlagName, String key) {
(feature, current) -> {
HashSet<String> keysByFeature = Optional.ofNullable(current).orElse(new HashSet<>());
keysByFeature.add(key);
trackerKeysSize++;
return keysByFeature;
});
_logger.debug("The feature flag " + featureFlagName + " and key " + key + " was added");
if (uniqueKeysTracker.size() >= MAX_AMOUNT_OF_TRACKED_UNIQUE_KEYS){
if (trackerKeysSize >= MAX_UNIQUE_KEYS_POST_SIZE){
_logger.warn("The UniqueKeysTracker size reached the maximum limit");
try {
sendUniqueKeys();
Expand Down Expand Up @@ -107,6 +110,7 @@ public HashMap<String,HashSet<String>> popAll(){
HashSet<String> value = uniqueKeysTracker.remove(key);
toReturn.put(key, value);
}
trackerKeysSize = 0;
return toReturn;
}

Expand All @@ -115,26 +119,79 @@ private void sendUniqueKeys(){
_log.debug("SendUniqueKeys already running");
return;
}

try {
if (uniqueKeysTracker.size() == 0) {
_log.debug("The Unique Keys Tracker is empty");
return;
}

HashMap<String, HashSet<String>> uniqueKeysHashMap = popAll();
List<UniqueKeys.UniqueKey> uniqueKeysFromPopAll = new ArrayList<>();
for (Map.Entry<String, HashSet<String>> uniqueKeyEntry : uniqueKeysHashMap.entrySet()) {
UniqueKeys.UniqueKey uniqueKey = new UniqueKeys.UniqueKey(uniqueKeyEntry.getKey(), new ArrayList<>(uniqueKeyEntry.getValue()));
uniqueKeysFromPopAll.add(uniqueKey);
}
_telemetrySynchronizer.synchronizeUniqueKeys(new UniqueKeys(uniqueKeysFromPopAll));
uniqueKeysFromPopAll = capChunksToMaxSize(uniqueKeysFromPopAll);

for (List<UniqueKeys.UniqueKey> chunk : getChunks(uniqueKeysFromPopAll)) {
_telemetrySynchronizer.synchronizeUniqueKeys(new UniqueKeys(chunk));
}
} finally {
sendGuard.set(false);
}
}

private List<UniqueKeys.UniqueKey> capChunksToMaxSize(List<UniqueKeys.UniqueKey> uniqeKeys) {
List<UniqueKeys.UniqueKey> finalChunk = new ArrayList<>();
for (UniqueKeys.UniqueKey uniqueKey : uniqeKeys) {
if (uniqueKey.keysDto.size() > MAX_UNIQUE_KEYS_POST_SIZE) {
for(List<String> subChunk : Lists.partition(uniqueKey.keysDto, MAX_UNIQUE_KEYS_POST_SIZE)) {
finalChunk.add(new UniqueKeys.UniqueKey(uniqueKey.featureName, subChunk));
}
continue;
}
finalChunk.add(uniqueKey);
}
return finalChunk;
}

private List<List<UniqueKeys.UniqueKey>> getChunks(List<UniqueKeys.UniqueKey> uniqeKeys) {
List<List<UniqueKeys.UniqueKey>> chunks = new ArrayList<>();
List<UniqueKeys.UniqueKey> intermediateChunk = new ArrayList<>();
for (UniqueKeys.UniqueKey uniqueKey : uniqeKeys) {
if ((getChunkSize(intermediateChunk) + uniqueKey.keysDto.size()) > MAX_UNIQUE_KEYS_POST_SIZE) {
chunks.add(intermediateChunk);
intermediateChunk = new ArrayList<>();
}
intermediateChunk.add(uniqueKey);
}
if (!intermediateChunk.isEmpty()) {
chunks.add(intermediateChunk);
}
return chunks;
}

private int getChunkSize(List<UniqueKeys.UniqueKey> uniqueKeysChunk) {
int totalSize = 0;
for (UniqueKeys.UniqueKey uniqueKey : uniqueKeysChunk) {
totalSize += uniqueKey.keysDto.size();
}
return totalSize;
}

private int getTrackerKeysSize() {
int totalSize = 0;
for (Map.Entry<String,HashSet<String>> item : uniqueKeysTracker.entrySet()) {
totalSize += item.getValue().size();
}
return totalSize;
}

private interface ExecuteUniqueKeysAction{
void execute();
}

private class ExecuteCleanFilter implements ExecuteUniqueKeysAction {

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
package io.split.client.impressions;

import io.split.client.dtos.UniqueKeys;
import io.split.telemetry.synchronizer.TelemetryInMemorySubmitter;
import io.split.telemetry.synchronizer.TelemetrySynchronizer;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;

public class UniqueKeysTrackerImpTest {
private static TelemetrySynchronizer _telemetrySynchronizer = Mockito.mock(TelemetryInMemorySubmitter.class);
Expand Down Expand Up @@ -100,4 +107,71 @@ public void testStopSynchronization() throws Exception {
uniqueKeysTrackerImp.stop();
Mockito.verify(telemetrySynchronizer, Mockito.times(1)).synchronizeUniqueKeys(Mockito.anyObject());
}

@Test
public void testUniqueKeysChunks() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
UniqueKeysTrackerImp uniqueKeysTrackerImp = new UniqueKeysTrackerImp(_telemetrySynchronizer, 10000, 10000, null);
HashMap<String, HashSet<String>> uniqueKeysHashMap = new HashMap<>();
HashSet<String> feature1 = new HashSet<>();
HashSet<String> feature2 = new HashSet<>();
HashSet<String> feature3 = new HashSet<>();
HashSet<String> feature4 = new HashSet<>();
HashSet<String> feature5 = new HashSet<>();
for (Integer i=1; i<6000; i++) {
if (i <= 1000) {
feature1.add("key" + i);
}
if (i <= 2000) {
feature2.add("key" + i);
}
if (i <= 3000) {
feature3.add("key" + i);
}
if (i <= 4000) {
feature4.add("key" + i);
}
feature5.add("key" + i);
}
uniqueKeysHashMap.put("feature1", feature1);
uniqueKeysHashMap.put("feature2", feature2);
uniqueKeysHashMap.put("feature3", feature3);
uniqueKeysHashMap.put("feature4", feature4);
uniqueKeysHashMap.put("feature5", feature5);

List<UniqueKeys.UniqueKey> uniqueKeysFromPopAll = new ArrayList<>();
for (Map.Entry<String, HashSet<String>> uniqueKeyEntry : uniqueKeysHashMap.entrySet()) {
UniqueKeys.UniqueKey uniqueKey = new UniqueKeys.UniqueKey(uniqueKeyEntry.getKey(), new ArrayList<>(uniqueKeyEntry.getValue()));
uniqueKeysFromPopAll.add(uniqueKey);
}
Method methodCapChunks = uniqueKeysTrackerImp.getClass().getDeclaredMethod("capChunksToMaxSize", List.class);
methodCapChunks.setAccessible(true);
uniqueKeysFromPopAll = (List<UniqueKeys.UniqueKey>)methodCapChunks.invoke(uniqueKeysTrackerImp, uniqueKeysFromPopAll);

Method methodGetChunks = uniqueKeysTrackerImp.getClass().getDeclaredMethod("getChunks", List.class);
methodGetChunks.setAccessible(true);
List<List<UniqueKeys.UniqueKey>> keysChunks = (List<List<UniqueKeys.UniqueKey>>) methodGetChunks.invoke(uniqueKeysTrackerImp, uniqueKeysFromPopAll);
for (List<UniqueKeys.UniqueKey> chunk : keysChunks) {
int chunkSize = 0;
for (UniqueKeys.UniqueKey keys : chunk) {
chunkSize += keys.keysDto.size();
}
Assert.assertTrue(chunkSize <= 5000);
}
}

@Test
public void testTrackReachMaxKeys() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, NoSuchFieldException {
TelemetrySynchronizer telemetrySynchronizer = Mockito.mock(TelemetryInMemorySubmitter.class);
UniqueKeysTrackerImp uniqueKeysTrackerImp = new UniqueKeysTrackerImp(telemetrySynchronizer, 10000, 10000, null);
for (int i=1; i<6000; i++) {
Assert.assertTrue(uniqueKeysTrackerImp.track("feature1", "key" + i));
Assert.assertTrue(uniqueKeysTrackerImp.track("feature2", "key" + i));
}
Mockito.verify(telemetrySynchronizer, Mockito.times(2)).synchronizeUniqueKeys(Mockito.anyObject());

Field getTrackerSize = uniqueKeysTrackerImp.getClass().getDeclaredField("trackerKeysSize");
getTrackerSize.setAccessible(true);
int trackerSize = (int) getTrackerSize.get(uniqueKeysTrackerImp);
Assert.assertTrue(trackerSize == 1998);
}
}