Skip to content

Commit 8ca7e57

Browse files
Add putIfAbsent to IAssignmentCache (#148)
This resolves a concurrency issue that allowed multiple AssignmentLogger callbacks on high contention first-experiment-access.
1 parent 4fa7306 commit 8ca7e57

File tree

4 files changed

+102
-7
lines changed

4 files changed

+102
-7
lines changed

src/main/java/cloud/eppo/BaseEppoClient.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -267,17 +267,11 @@ protected EppoValue getTypedAssignment(
267267
boolean logAssignment = true;
268268
AssignmentCacheEntry cacheEntry = AssignmentCacheEntry.fromVariationAssignment(assignment);
269269
if (assignmentCache != null) {
270-
if (assignmentCache.hasEntry(cacheEntry)) {
271-
logAssignment = false;
272-
}
270+
logAssignment = assignmentCache.putIfAbsent(cacheEntry);
273271
}
274272

275273
if (logAssignment) {
276274
assignmentLogger.logAssignment(assignment);
277-
278-
if (assignmentCache != null) {
279-
assignmentCache.put(cacheEntry);
280-
}
281275
}
282276

283277
} catch (Exception e) {

src/main/java/cloud/eppo/api/AbstractAssignmentCache.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ public interface CacheDelegate {
1717
void put(String cacheKey, @NotNull String serializedEntry);
1818

1919
@Nullable String get(String cacheKey);
20+
21+
boolean putIfAbsent(String cacheKey, @NotNull String serializedEntry);
2022
}
2123

2224
protected final CacheDelegate delegate;
@@ -38,6 +40,19 @@ public void put(String cacheKey, @NotNull String serializedEntry) {
3840
public String get(String cacheKey) {
3941
return delegate.get(cacheKey);
4042
}
43+
44+
@Override
45+
public boolean putIfAbsent(String cacheKey, @NotNull String serializedEntry) {
46+
boolean hadNoPreviousEntry;
47+
synchronized (delegate) {
48+
String entry = delegate.get(cacheKey);
49+
hadNoPreviousEntry = entry == null;
50+
if (hadNoPreviousEntry) {
51+
delegate.put(cacheKey, serializedEntry);
52+
}
53+
}
54+
return hadNoPreviousEntry;
55+
}
4156
});
4257
}
4358

@@ -55,4 +70,9 @@ private String get(AssignmentCacheKey key) {
5570
public void put(AssignmentCacheEntry entry) {
5671
delegate.put(entry.getKeyString(), entry.getValueKeyString());
5772
}
73+
74+
@Override
75+
public boolean putIfAbsent(AssignmentCacheEntry entry) {
76+
return delegate.putIfAbsent(entry.getKeyString(), entry.getValueKeyString());
77+
}
5878
}

src/main/java/cloud/eppo/api/IAssignmentCache.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
* determine both presence and uniqueness of the cached value.
88
*/
99
public interface IAssignmentCache {
10+
/**
11+
* Puts the entry into the cache, overwriting an existing entry.
12+
*/
1013
void put(AssignmentCacheEntry entry);
1114

1215
/**
@@ -15,4 +18,10 @@ public interface IAssignmentCache {
1518
* comparing the `getValueKeyString()` method results.
1619
*/
1720
boolean hasEntry(AssignmentCacheEntry entry);
21+
22+
/**
23+
* Puts the entry into the cache if none exists.
24+
* @return true if no previous cache entry exists, false otherwise
25+
*/
26+
boolean putIfAbsent(AssignmentCacheEntry entry);
1827
}

src/test/java/cloud/eppo/BaseEppoClientTest.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,18 @@
2323
import java.util.*;
2424
import java.util.concurrent.CompletableFuture;
2525
import java.util.concurrent.CompletionException;
26+
import java.util.concurrent.CountDownLatch;
27+
import java.util.concurrent.ExecutorService;
28+
import java.util.concurrent.Executors;
29+
import java.util.concurrent.ThreadFactory;
30+
import java.util.concurrent.TimeUnit;
2631
import java.util.concurrent.atomic.AtomicInteger;
2732
import java.util.stream.Stream;
2833
import okhttp3.mockwebserver.MockResponse;
2934
import okhttp3.mockwebserver.MockWebServer;
3035
import okhttp3.mockwebserver.RecordedRequest;
3136
import org.apache.commons.io.FileUtils;
37+
import org.jetbrains.annotations.NotNull;
3238
import org.junit.jupiter.api.BeforeEach;
3339
import org.junit.jupiter.api.Test;
3440
import org.junit.jupiter.params.ParameterizedTest;
@@ -557,6 +563,72 @@ public void testAssignmentEventCorrectlyDeduplicated() {
557563
verify(mockAssignmentLogger, times(3)).logAssignment(any(Assignment.class));
558564
}
559565

566+
@Test
567+
public void testAssignmentEventCorrectlyDeduplicatedFromBackgroundThreads() {
568+
initClientWithAssignmentCache(new LRUInMemoryAssignmentCache(1024));
569+
570+
Attributes subjectAttributes = new Attributes();
571+
subjectAttributes.put("number", EppoValue.valueOf("123456789"));
572+
573+
int numThreads = 10;
574+
final CountDownLatch threadStartCountDownLatch = new CountDownLatch(numThreads);
575+
final CountDownLatch getAssignmentStartCountDownLatch = new CountDownLatch(1);
576+
final List<Integer> assignments = Collections.synchronizedList(Arrays.asList(new Integer[numThreads]));
577+
try (ExecutorService pool = Executors.newFixedThreadPool(numThreads, new ThreadFactory() {
578+
private final AtomicInteger threadIndexAtomicInteger = new AtomicInteger(0);
579+
@Override
580+
public Thread newThread(@NotNull Runnable runnable) {
581+
final int threadIndex = threadIndexAtomicInteger.getAndIncrement();
582+
return new Thread(runnable, "testAssignmentEventCorrectlyDeduplicatedFromBackgroundThreads-" + threadIndex);
583+
}
584+
})) {
585+
for (int i = 0; i < numThreads; i += 1) {
586+
final int threadIndex = i;
587+
pool.execute(
588+
() -> {
589+
threadStartCountDownLatch.countDown();
590+
boolean shouldStart;
591+
try {
592+
shouldStart = getAssignmentStartCountDownLatch.await(1000, TimeUnit.SECONDS);
593+
} catch (InterruptedException ignored) {
594+
shouldStart = false;
595+
}
596+
final Integer assignment;
597+
if (shouldStart) {
598+
assignment = eppoClient.getIntegerAssignment("numeric-one-of", "alice", subjectAttributes, 0);
599+
} else {
600+
assignment = null;
601+
}
602+
603+
assignments.set(threadIndex, assignment);
604+
}
605+
);
606+
}
607+
608+
boolean shouldStart;
609+
try {
610+
shouldStart = threadStartCountDownLatch.await(2, TimeUnit.SECONDS);
611+
} catch (InterruptedException ignored) {
612+
shouldStart = false;
613+
}
614+
615+
assertTrue(shouldStart, "All worker threads did not start");
616+
getAssignmentStartCountDownLatch.countDown();
617+
}
618+
619+
final List<Integer> expectedAssignments;
620+
{
621+
final Integer[] expectedAssignmentsArray = new Integer[numThreads];
622+
// `2` matches the attribute `number` value of "123456789"
623+
Arrays.fill(expectedAssignmentsArray, 2);
624+
expectedAssignments = Arrays.asList(expectedAssignmentsArray);
625+
}
626+
assertEquals(expectedAssignments, assignments);
627+
628+
// `logAssignment` should be called only once.
629+
verify(mockAssignmentLogger, times(1)).logAssignment(any(Assignment.class));
630+
}
631+
560632
@Test
561633
public void testAssignmentLogErrorNonFatal() {
562634
initClient();

0 commit comments

Comments
 (0)