Skip to content

Commit a0b0ac2

Browse files
authored
Feature version header (#226)
* Added LibraryVersion and LibraryFeature headers * Added full history loading when sticky task is received for a workflow not in cache
1 parent 1a72d9f commit a0b0ac2

File tree

6 files changed

+177
-37
lines changed

6 files changed

+177
-37
lines changed

build.gradle

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,25 @@ compileJava {
6868
".*/generated-sources/.*" << "-Werror"
6969
}
7070

71+
// Generation version.properties for value to be included into the request header
72+
task createProperties(dependsOn: processResources) {
73+
doLast {
74+
def subdir = new File("$buildDir/resources/main/com/uber/cadence/")
75+
if( !subdir.exists() ) {
76+
subdir.mkdirs()
77+
}
78+
new File("$buildDir/resources/main/com/uber/cadence/version.properties").withWriter { w ->
79+
Properties p = new Properties()
80+
p['cadence-client-version'] = project.version.toString()
81+
p.store w, null
82+
}
83+
}
84+
}
85+
86+
classes {
87+
dependsOn createProperties
88+
}
89+
7190
compileTestJava {
7291
options.encoding = 'UTF-8'
7392
options.compilerArgs << "-Xlint:unchecked" << "-Xlint:deprecation" << "-XepExcludedPaths:" +
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.internal;
19+
20+
import java.io.IOException;
21+
import java.io.InputStream;
22+
import java.util.Properties;
23+
24+
/**
25+
* below are the metadata which will be embedded as part of headers in every rpc call made by this
26+
* client to cadence server.
27+
*
28+
* <p>Update to the metadata below is typically done by the cadence team as part of a major feature
29+
* or behavior change
30+
*/
31+
public class Version {
32+
33+
/**
34+
* Library Version is a semver that represents the version of this cadence client library. This
35+
* represent API changes visibile to Cadence client side library consumers. I.e. developers that
36+
* are writing workflows. So every time we change API that can affect them we have to change this
37+
* number. Format: MAJOR.MINOR.PATCH
38+
*/
39+
public static final String LIBRARY_VERSION;
40+
41+
/**
42+
* Feature Version is a semver that represents the feature set of this cadence client library
43+
* support. This can be used for client capibility check, on Cadence server, for backward
44+
* compatibility Format: MAJOR.MINOR.PATCH
45+
*/
46+
public static final String FEATURE_VERSION = "1.0.0";
47+
48+
static {
49+
// Load version from version.properties generated by Gradle into build/resources/main directory.
50+
Properties prop = new Properties();
51+
InputStream in = Version.class.getResourceAsStream("/com/uber/cadence/version.properties");
52+
if (in == null) {
53+
LIBRARY_VERSION = "UNKNOWN";
54+
} else {
55+
String version = null;
56+
try {
57+
try {
58+
prop.load(in);
59+
version = prop.getProperty("cadence-client-version");
60+
} finally {
61+
in.close();
62+
}
63+
} catch (IOException e) {
64+
if (version == null) {
65+
version = "UNKNOWN";
66+
}
67+
}
68+
LIBRARY_VERSION = version;
69+
}
70+
}
71+
}

src/main/java/com/uber/cadence/internal/replay/DeciderCache.java

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@
1818
package com.uber.cadence.internal.replay;
1919

2020
import com.google.common.base.Preconditions;
21+
import com.google.common.base.Throwables;
2122
import com.google.common.cache.CacheBuilder;
2223
import com.google.common.cache.CacheLoader;
2324
import com.google.common.cache.LoadingCache;
25+
import com.google.common.util.concurrent.ExecutionError;
26+
import com.google.common.util.concurrent.UncheckedExecutionException;
2427
import com.uber.cadence.PollForDecisionTaskResponse;
2528
import com.uber.cadence.internal.common.ThrowableFunc1;
2629
import com.uber.cadence.internal.metrics.MetricsType;
@@ -30,6 +33,7 @@
3033
import java.util.Random;
3134
import java.util.Set;
3235
import java.util.concurrent.TimeUnit;
36+
import java.util.concurrent.atomic.AtomicBoolean;
3337
import java.util.concurrent.locks.Lock;
3438
import java.util.concurrent.locks.ReentrantLock;
3539
import org.slf4j.Logger;
@@ -75,18 +79,26 @@ public Decider getOrCreate(
7579
invalidate(decisionTask);
7680
return cache.get(runId, () -> createReplayDecider.apply(decisionTask));
7781
}
78-
return getUnchecked(runId);
79-
}
80-
81-
public Decider getUnchecked(String runId) throws Exception {
82+
AtomicBoolean miss = new AtomicBoolean();
83+
Decider result = null;
8284
try {
83-
Decider cachedDecider = cache.getUnchecked(runId);
84-
metricsScope.counter(MetricsType.STICKY_CACHE_HIT).inc(1);
85-
return cachedDecider;
86-
} catch (CacheLoader.InvalidCacheLoadException e) {
87-
metricsScope.counter(MetricsType.STICKY_CACHE_MISS).inc(1);
88-
throw new EvictedException(runId);
85+
result =
86+
cache.get(
87+
runId,
88+
() -> {
89+
miss.set(true);
90+
return createReplayDecider.apply(decisionTask);
91+
});
92+
} catch (UncheckedExecutionException | ExecutionError e) {
93+
Throwables.throwIfUnchecked(e.getCause());
94+
} finally {
95+
if (miss.get()) {
96+
metricsScope.counter(MetricsType.STICKY_CACHE_MISS).inc(1);
97+
} else {
98+
metricsScope.counter(MetricsType.STICKY_CACHE_HIT).inc(1);
99+
}
89100
}
101+
return result;
90102
}
91103

92104
public void evictAny(String runId) throws InterruptedException {

src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,18 @@ public boolean isAnyTypeSupported() {
210210

211211
private Decider createDecider(PollForDecisionTaskResponse decisionTask) throws Exception {
212212
WorkflowType workflowType = decisionTask.getWorkflowType();
213+
List<HistoryEvent> events = decisionTask.getHistory().getEvents();
214+
// Sticky decision task with partial history
215+
if (events.isEmpty() || events.get(0).getEventId() > 1) {
216+
GetWorkflowExecutionHistoryRequest getHistoryRequest =
217+
new GetWorkflowExecutionHistoryRequest()
218+
.setDomain(domain)
219+
.setExecution(decisionTask.getWorkflowExecution());
220+
GetWorkflowExecutionHistoryResponse getHistoryResponse =
221+
service.GetWorkflowExecutionHistory(getHistoryRequest);
222+
decisionTask.setHistory(getHistoryResponse.getHistory());
223+
decisionTask.setNextPageToken(getHistoryResponse.getNextPageToken());
224+
}
213225
DecisionsHelper decisionsHelper = new DecisionsHelper(decisionTask);
214226
ReplayWorkflow workflow = workflowFactory.getWorkflow(workflowType);
215227
return new ReplayDecider(

src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import com.uber.cadence.WorkflowExecutionAlreadyStartedError;
7575
import com.uber.cadence.WorkflowService;
7676
import com.uber.cadence.WorkflowService.GetWorkflowExecutionHistory_result;
77+
import com.uber.cadence.internal.Version;
7778
import com.uber.cadence.internal.common.CheckedExceptionWrapper;
7879
import com.uber.cadence.internal.metrics.MetricsType;
7980
import com.uber.cadence.internal.metrics.NoopScope;
@@ -164,7 +165,7 @@ private ClientOptions(Builder builder) {
164165
}
165166
this.metricsScope = builder.metricsScope;
166167
if (builder.transportHeaders != null) {
167-
this.transportHeaders = builder.transportHeaders;
168+
this.transportHeaders = ImmutableMap.copyOf(builder.transportHeaders);
168169
} else {
169170
this.transportHeaders = ImmutableMap.of();
170171
}
@@ -358,7 +359,13 @@ public WorkflowServiceTChannel(String host, int port, ClientOptions options) {
358359
ArrayList<InetSocketAddress> peers = new ArrayList<>();
359360
peers.add(new InetSocketAddress(address, port));
360361
this.subChannel = tChannel.makeSubChannel(options.getServiceName()).setPeers(peers);
361-
log.info("Initialized TChannel for service " + this.subChannel.getServiceName());
362+
log.info(
363+
"Initialized TChannel for service "
364+
+ this.subChannel.getServiceName()
365+
+ ", LibraryVersion: "
366+
+ Version.LIBRARY_VERSION
367+
+ ", FeatureVersion: "
368+
+ Version.FEATURE_VERSION);
362369
} catch (UnknownHostException e) {
363370
tChannel.shutdown();
364371
throw new RuntimeException("Unable to get name of host " + host, e);
@@ -391,6 +398,8 @@ private static Map<String, String> getThriftHeaders() {
391398
return ImmutableMap.<String, String>builder()
392399
.put("user-name", envUserName)
393400
.put("host-name", envHostname)
401+
.put("cadence-client-library-version", Version.LIBRARY_VERSION)
402+
.put("cadence-client-feature-version", Version.FEATURE_VERSION)
394403
.build();
395404
}
396405

src/test/java/com/uber/cadence/internal/replay/ReplayDeciderCacheTests.java

Lines changed: 42 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@
1818
package com.uber.cadence.internal.replay;
1919

2020
import static junit.framework.TestCase.assertEquals;
21-
import static org.junit.Assert.assertNotSame;
22-
import static org.junit.Assert.fail;
21+
import static org.junit.Assert.*;
2322
import static org.mockito.Matchers.anyInt;
2423
import static org.mockito.Matchers.eq;
2524
import static org.mockito.Mockito.*;
2625

2726
import com.uber.cadence.HistoryEvent;
2827
import com.uber.cadence.PollForDecisionTaskResponse;
28+
import com.uber.cadence.WorkflowExecution;
2929
import com.uber.cadence.WorkflowQuery;
3030
import com.uber.cadence.internal.metrics.MetricsTag;
3131
import com.uber.cadence.internal.metrics.MetricsType;
@@ -61,29 +61,39 @@ public void whenHistoryIsFullNewReplayDeciderIsReturnedAndCached_InitiallyEmpty(
6161
Decider decider = replayDeciderCache.getOrCreate(decisionTask, this::createFakeDecider);
6262

6363
// Assert
64-
assertEquals(decider, replayDeciderCache.getUnchecked(runId));
64+
assertNotEquals(decider, replayDeciderCache.getOrCreate(decisionTask, this::createFakeDecider));
6565
}
6666

6767
@Test
6868
public void whenHistoryIsFullNewReplayDeciderIsReturned_InitiallyCached() throws Exception {
69+
TestWorkflowService service = new TestWorkflowService();
70+
6971
// Arrange
7072
DeciderCache replayDeciderCache = new DeciderCache(10, NoopScope.getInstance());
71-
PollForDecisionTaskResponse decisionTask =
72-
HistoryUtils.generateDecisionTaskWithInitialHistory();
73+
PollForDecisionTaskResponse decisionTask1 =
74+
HistoryUtils.generateDecisionTaskWithInitialHistory(
75+
"domain", "taskList", "workflowType", service);
7376

74-
String runId = decisionTask.getWorkflowExecution().getRunId();
75-
Decider decider = replayDeciderCache.getOrCreate(decisionTask, this::createFakeDecider);
76-
assertEquals(decider, replayDeciderCache.getUnchecked(runId));
77+
String runId = decisionTask1.getWorkflowExecution().getRunId();
78+
Decider decider = replayDeciderCache.getOrCreate(decisionTask1, this::createFakeDecider);
79+
80+
PollForDecisionTaskResponse decisionTask2 =
81+
HistoryUtils.generateDecisionTaskWithPartialHistoryFromExistingTask(
82+
decisionTask1, "domain", "stickyTaskList", service);
83+
84+
assertEquals(
85+
decider, replayDeciderCache.getOrCreate(decisionTask2, this::doNotCreateFakeDecider));
7786

7887
// Act
79-
Decider decider2 = replayDeciderCache.getOrCreate(decisionTask, this::createFakeDecider);
88+
Decider decider2 = replayDeciderCache.getOrCreate(decisionTask2, this::createFakeDecider);
8089

8190
// Assert
82-
assertEquals(decider2, replayDeciderCache.getUnchecked(runId));
83-
assertNotSame(decider2, decider);
91+
assertEquals(decider2, replayDeciderCache.getOrCreate(decisionTask2, this::createFakeDecider));
92+
assertSame(decider2, decider);
93+
service.close();
8494
}
8595

86-
@Test
96+
@Test(timeout = 2000)
8797
public void whenHistoryIsPartialCachedEntryIsReturned() throws Exception {
8898
// Arrange
8999
Map<String, String> tags =
@@ -97,31 +107,30 @@ public void whenHistoryIsPartialCachedEntryIsReturned() throws Exception {
97107

98108
DeciderCache replayDeciderCache = new DeciderCache(10, scope);
99109
TestWorkflowService service = new TestWorkflowService();
110+
service.lockTimeSkipping("test");
100111
PollForDecisionTaskResponse decisionTask =
101112
HistoryUtils.generateDecisionTaskWithInitialHistory(
102113
"domain", "taskList", "workflowType", service);
103114

104115
String runId = decisionTask.getWorkflowExecution().getRunId();
105116
Decider decider = replayDeciderCache.getOrCreate(decisionTask, this::createFakeDecider);
106-
assertEquals(decider, replayDeciderCache.getUnchecked(runId));
107117

108118
// Act
109119
PollForDecisionTaskResponse decisionTask2 =
110120
HistoryUtils.generateDecisionTaskWithPartialHistoryFromExistingTask(
111121
decisionTask, "domain", "stickyTaskList", service);
112-
Decider decider2 = replayDeciderCache.getOrCreate(decisionTask2, this::createFakeDecider);
122+
Decider decider2 = replayDeciderCache.getOrCreate(decisionTask2, this::doNotCreateFakeDecider);
113123

114124
// Assert
115125
// Wait for reporter
116-
Thread.sleep(1000);
117-
verify(reporter, times(1)).reportCounter(MetricsType.STICKY_CACHE_HIT, tags, 2);
118-
assertEquals(decider2, replayDeciderCache.getUnchecked(runId));
119-
assertEquals(decider2, decider);
126+
Thread.sleep(500);
127+
verify(reporter, times(1)).reportCounter(MetricsType.STICKY_CACHE_HIT, tags, 1);
128+
assertEquals(decider, decider2);
129+
service.close();
120130
}
121131

122132
@Test
123-
public void whenHistoryIsPartialAndCacheIsEmptyThenCacheEvictedExceptionIsThrown()
124-
throws Exception {
133+
public void whenHistoryIsPartialAndCacheIsEmptyThenExceptionIsThrown() throws Exception {
125134
// Arrange
126135
Map<String, String> tags =
127136
new ImmutableMap.Builder<String, String>(2)
@@ -139,7 +148,7 @@ public void whenHistoryIsPartialAndCacheIsEmptyThenCacheEvictedExceptionIsThrown
139148

140149
try {
141150
replayDeciderCache.getOrCreate(decisionTask, this::createFakeDecider);
142-
} catch (DeciderCache.EvictedException ex) {
151+
} catch (IllegalArgumentException ex) {
143152

144153
// Wait for reporter
145154
Thread.sleep(600);
@@ -148,7 +157,7 @@ public void whenHistoryIsPartialAndCacheIsEmptyThenCacheEvictedExceptionIsThrown
148157
}
149158

150159
fail(
151-
"Expected replayDeciderCache.getOrCreate to throw ReplayDeciderCache.EvictedException but no exception was thrown");
160+
"Expected replayDeciderCache.getOrCreate to throw IllegalArgumentException but no exception was thrown");
152161
}
153162

154163
@Test
@@ -208,15 +217,23 @@ public void evictAnyWillNotInvalidateItself() throws Exception {
208217
}
209218

210219
private void assertCacheIsEmpty(DeciderCache cache, String runId) throws Exception {
211-
DeciderCache.EvictedException ex = null;
220+
Throwable ex = null;
212221
try {
213-
cache.getUnchecked(runId);
214-
} catch (DeciderCache.EvictedException e) {
222+
PollForDecisionTaskResponse decisionTask =
223+
new PollForDecisionTaskResponse()
224+
.setWorkflowExecution(new WorkflowExecution().setRunId(runId));
225+
cache.getOrCreate(decisionTask, this::doNotCreateFakeDecider);
226+
} catch (AssertionError e) {
215227
ex = e;
216228
}
217229
TestCase.assertNotNull(ex);
218230
}
219231

232+
private ReplayDecider doNotCreateFakeDecider(PollForDecisionTaskResponse response) {
233+
fail("should not be called");
234+
return null;
235+
}
236+
220237
private ReplayDecider createFakeDecider(PollForDecisionTaskResponse response) {
221238
return new ReplayDecider(
222239
new TestWorkflowService(),

0 commit comments

Comments
 (0)