Skip to content

Commit c606a13

Browse files
authored
Fix evict next (#214)
1 parent 81e974c commit c606a13

File tree

4 files changed

+57
-28
lines changed

4 files changed

+57
-28
lines changed

src/main/java/com/uber/cadence/internal/replay/DeciderCache.java

Lines changed: 39 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,19 @@
2727
import com.uber.cadence.internal.metrics.MetricsType;
2828
import com.uber.m3.tally.Scope;
2929
import java.util.Objects;
30+
import java.util.Random;
3031
import java.util.UUID;
31-
import org.slf4j.Logger;
32-
import org.slf4j.LoggerFactory;
32+
import java.util.concurrent.TimeUnit;
33+
import java.util.concurrent.locks.Lock;
34+
import java.util.concurrent.locks.ReentrantLock;
3335

3436
public final class DeciderCache {
3537
private final String evictionEntryId = UUID.randomUUID().toString();
3638
private final int maxCacheSize;
3739
private final Scope metricsScope;
3840
private LoadingCache<String, WeightedCacheEntry<Decider>> cache;
39-
private static final Logger log = LoggerFactory.getLogger(DeciderCache.class);
41+
private Lock evictionLock = new ReentrantLock();
42+
Random rand = new Random();
4043

4144
public DeciderCache(int maxCacheSize, Scope scope) {
4245
Preconditions.checkArgument(maxCacheSize > 0, "Max cache size must be greater than 0");
@@ -45,6 +48,7 @@ public DeciderCache(int maxCacheSize, Scope scope) {
4548
this.cache =
4649
CacheBuilder.newBuilder()
4750
.maximumWeight(maxCacheSize)
51+
.concurrencyLevel(1)
4852
.weigher(
4953
(Weigher<String, WeightedCacheEntry<Decider>>) (key, value) -> value.getWeight())
5054
.removalListener(
@@ -70,7 +74,7 @@ public Decider getOrCreate(
7074
String runId = decisionTask.getWorkflowExecution().getRunId();
7175
metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
7276
if (isFullHistory(decisionTask)) {
73-
cache.invalidate(runId);
77+
invalidate(decisionTask);
7478
return cache.get(
7579
runId, () -> new WeightedCacheEntry<>(createReplayDecider.apply(decisionTask), 1))
7680
.entry;
@@ -89,22 +93,43 @@ public Decider getUnchecked(String runId) throws Exception {
8993
}
9094
}
9195

92-
public void evictNext() {
93-
metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
94-
int remainingSpace = (int) (maxCacheSize - cache.size());
95-
// Force eviction to happen
96-
cache.put(evictionEntryId, new WeightedCacheEntry<>(null, remainingSpace + 1));
97-
invalidate(evictionEntryId);
96+
public void evictNext() throws InterruptedException {
97+
// Timeout is to guard against workflows trying to evict each other.
98+
if (!evictionLock.tryLock(rand.nextInt(4), TimeUnit.SECONDS)) {
99+
return;
100+
}
101+
try {
102+
metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
103+
int remainingSpace = (int) (maxCacheSize - cache.size());
104+
// Force eviction to happen. This assumes a concurrency level of 1 which implies a single
105+
// underlying segment and lock. If higher concurrency levels are assumed this may not work
106+
// since
107+
// the weight could be greater than the segment size and put will simply noop.
108+
// ConcurrenyLevel limits cache modification but reads and cache loading computations still
109+
// have concurrently.
110+
cache.put(evictionEntryId, new WeightedCacheEntry<>(null, remainingSpace + 1));
111+
invalidate(evictionEntryId);
112+
metricsScope.counter(MetricsType.STICKY_CACHE_THREAD_FORCED_EVICTION).inc(1);
113+
} finally {
114+
evictionLock.unlock();
115+
}
98116
}
99117

100-
public void invalidate(PollForDecisionTaskResponse decisionTask) {
118+
public void invalidate(PollForDecisionTaskResponse decisionTask) throws InterruptedException {
101119
String runId = decisionTask.getWorkflowExecution().getRunId();
102120
invalidate(runId);
103121
}
104122

105-
public void invalidate(String runId) {
106-
metricsScope.counter(MetricsType.STICKY_CACHE_TOTAL_FORCED_EVICTION).inc(1);
107-
cache.invalidate(runId);
123+
private void invalidate(String runId) throws InterruptedException {
124+
if (!evictionLock.tryLock(rand.nextInt(4), TimeUnit.SECONDS)) {
125+
return;
126+
}
127+
try {
128+
cache.invalidate(runId);
129+
metricsScope.counter(MetricsType.STICKY_CACHE_TOTAL_FORCED_EVICTION).inc(1);
130+
} finally {
131+
evictionLock.unlock();
132+
}
108133
}
109134

110135
public long size() {

src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,13 @@ public void start() {
236236
.getMetricsScope()
237237
.counter(MetricsType.STICKY_CACHE_THREAD_FORCED_EVICTION)
238238
.inc(1);
239-
cache.evictNext();
239+
try {
240+
if (cache != null) {
241+
cache.evictNext();
242+
}
243+
} catch (InterruptedException e1) {
244+
log.warn("Unable to evict cache", e1);
245+
}
240246
}
241247

242248
try {

src/test/java/com/uber/cadence/internal/replay/ReplayDeciderCacheTests.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020
import static junit.framework.TestCase.assertEquals;
2121
import static org.junit.Assert.assertNotSame;
2222
import static org.junit.Assert.fail;
23-
import static org.mockito.Mockito.mock;
24-
import static org.mockito.Mockito.times;
25-
import static org.mockito.Mockito.verify;
23+
import static org.mockito.Matchers.anyInt;
24+
import static org.mockito.Matchers.eq;
25+
import static org.mockito.Mockito.*;
2626

2727
import com.uber.cadence.HistoryEvent;
2828
import com.uber.cadence.PollForDecisionTaskResponse;
@@ -160,10 +160,10 @@ public void evictNextWillInvalidateTheNextEntryInLineToBeEvicted() throws Except
160160
.build();
161161
StatsReporter reporter = mock(StatsReporter.class);
162162
Scope scope =
163-
new RootScopeBuilder().reporter(reporter).reportEvery(Duration.ofMillis(10)).tagged(tags);
163+
new RootScopeBuilder().reporter(reporter).reportEvery(Duration.ofMillis(100)).tagged(tags);
164164

165165
// Arrange
166-
DeciderCache replayDeciderCache = new DeciderCache(10, scope);
166+
DeciderCache replayDeciderCache = new DeciderCache(50, scope);
167167
PollForDecisionTaskResponse decisionTask1 =
168168
HistoryUtils.generateDecisionTaskWithInitialHistory();
169169
PollForDecisionTaskResponse decisionTask2 =
@@ -187,8 +187,9 @@ public void evictNextWillInvalidateTheNextEntryInLineToBeEvicted() throws Except
187187

188188
// Wait for reporter
189189
Thread.sleep(600);
190-
verify(reporter, times(1))
191-
.reportCounter(MetricsType.STICKY_CACHE_TOTAL_FORCED_EVICTION, tags, 1);
190+
verify(reporter, atLeastOnce())
191+
.reportCounter(
192+
eq(MetricsType.STICKY_CACHE_TOTAL_FORCED_EVICTION), eq(tags), anyInt());
192193
}
193194

194195
private void assertCacheIsEmpty(DeciderCache cache, String runId) throws Exception {

src/test/java/com/uber/cadence/internal/sync/DeterministicRunnerTest.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,7 @@
2222
import static junit.framework.TestCase.assertTrue;
2323
import static junit.framework.TestCase.fail;
2424
import static org.junit.Assert.assertFalse;
25-
import static org.mockito.Mockito.mock;
26-
import static org.mockito.Mockito.times;
27-
import static org.mockito.Mockito.verify;
28-
import static org.mockito.Mockito.when;
25+
import static org.mockito.Mockito.*;
2926

3027
import com.uber.cadence.Decision;
3128
import com.uber.cadence.PollForDecisionTaskResponse;
@@ -718,8 +715,8 @@ public void workflowThreadsWillEvictCacheWhenMaxThreadCountIsHit() throws Throwa
718715
assertEquals(0, cache.size()); // cache was evicted
719716
// Wait for reporter
720717
Thread.sleep(600);
721-
verify(reporter, times(1))
722-
.reportCounter(MetricsType.STICKY_CACHE_THREAD_FORCED_EVICTION, tags, 1);
718+
verify(reporter, atLeastOnce())
719+
.reportCounter(eq(MetricsType.STICKY_CACHE_THREAD_FORCED_EVICTION), eq(tags), anyInt());
723720
}
724721

725722
@Test

0 commit comments

Comments
 (0)