Skip to content

Commit 7b4fc94

Browse files
authored
Support shadowing worker feature (#598)
1 parent f44a28a commit 7b4fc94

26 files changed

+2267
-6
lines changed

build.gradle

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,7 @@ dependencies {
6767
license {
6868
header rootProject.file('license-header.txt')
6969
skipExistingHeaders true
70-
exclude 'com/uber/cadence/*.java' // generated code
71-
excludes(["**/*.json"])
70+
excludes(["**/*.json", "com/uber/cadence/*.java", "com/uber/cadence/shadower/*.java"]) // config files and generated code
7271
}
7372

7473
task initDlsSubmodule(type: Exec) {
@@ -84,7 +83,7 @@ task updateDlsSubmodule(type: Exec) {
8483

8584
compileThrift {
8685
dependsOn updateDlsSubmodule
87-
sourceItems "${projectDir}/src/main/idls/thrift/cadence.thrift","${projectDir}/src/main/idls/thrift/shared.thrift"
86+
sourceItems "${projectDir}/src/main/idls/thrift/cadence.thrift","${projectDir}/src/main/idls/thrift/shared.thrift","${projectDir}/src/main/idls/thrift/shadower.thrift"
8887

8988
nowarn true
9089
}

src/main/idls

Submodule idls updated from 3edd104 to 615f1ca
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Modifications Copyright (c) 2017-2021 Uber Technologies Inc.
3+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
6+
* use this file except in compliance with the License. A copy of the License is
7+
* located at
8+
*
9+
* http://aws.amazon.com/apache2.0
10+
*
11+
* or in the "license" file accompanying this file. This file is distributed on
12+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
13+
* express or implied. See the License for the specific language governing
14+
* permissions and limitations under the License.
15+
*/
16+
package com.uber.cadence.internal.errors;
17+
18+
public class ErrorType {
19+
public static final String UNKNOWN_WORKFLOW_TYPE = "Unknown workflow type";
20+
}

src/main/java/com/uber/cadence/internal/metrics/MetricsType.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,4 +161,9 @@ public class MetricsType {
161161

162162
public static final String NON_DETERMINISTIC_ERROR =
163163
CADENCE_METRICS_PREFIX + "non-deterministic-error";
164+
165+
public static final String REPLAY_FAILED_COUNTER = CADENCE_METRICS_PREFIX + "replay-failed";
166+
public static final String REPLAY_SKIPPED_COUNTER = CADENCE_METRICS_PREFIX + "replay-skipped";
167+
public static final String REPLAY_SUCCESS_COUNTER = CADENCE_METRICS_PREFIX + "replay-succeed";
168+
public static final String REPLAY_LATENCY = CADENCE_METRICS_PREFIX + "replay-latency";
164169
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Modifications Copyright (c) 2017-2021 Uber Technologies Inc.
3+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
6+
* use this file except in compliance with the License. A copy of the License is
7+
* located at
8+
*
9+
* http://aws.amazon.com/apache2.0
10+
*
11+
* or in the "license" file accompanying this file. This file is distributed on
12+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
13+
* express or implied. See the License for the specific language governing
14+
* permissions and limitations under the License.
15+
*/
16+
package com.uber.cadence.internal.shadowing;
17+
18+
public class NonRetryableException extends RuntimeException {
19+
public NonRetryableException(Throwable cause) {
20+
super(cause);
21+
}
22+
}
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Modifications Copyright (c) 2017-2021 Uber Technologies Inc.
3+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
6+
* use this file except in compliance with the License. A copy of the License is
7+
* located at
8+
*
9+
* http://aws.amazon.com/apache2.0
10+
*
11+
* or in the "license" file accompanying this file. This file is distributed on
12+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
13+
* express or implied. See the License for the specific language governing
14+
* permissions and limitations under the License.
15+
*/
16+
package com.uber.cadence.internal.shadowing;
17+
18+
import com.google.common.collect.Lists;
19+
import com.uber.cadence.worker.ShadowingOptions;
20+
import com.uber.cadence.worker.TimeFilter;
21+
import com.uber.cadence.worker.WorkflowStatus;
22+
import java.time.ZonedDateTime;
23+
import java.util.Collection;
24+
import java.util.stream.Collectors;
25+
26+
public class QueryBuilder {
27+
public static QueryBuilder newQueryBuilder() {
28+
return new QueryBuilder();
29+
}
30+
31+
public static QueryBuilder newQueryBuilder(ShadowingOptions options) {
32+
return new QueryBuilder()
33+
.setWorkflowTypes(options.getWorkflowTypes())
34+
.setWorkflowStartTime(options.getWorkflowStartTimeFilter())
35+
.setWorkflowStatuses(options.getWorkflowStatuses());
36+
}
37+
38+
public QueryBuilder setWorkflowTypes(Collection<String> workflowTypes) {
39+
if (workflowTypes == null || workflowTypes.isEmpty()) {
40+
return this;
41+
}
42+
43+
Collection<String> types =
44+
workflowTypes
45+
.stream()
46+
.map((wfType) -> WORKFLOW_TYPE_PLACEHOLDER + wfType)
47+
.collect(Collectors.toList());
48+
49+
String query = String.join(OR_QUERY, types);
50+
this.appendPartialQuery(query);
51+
return this;
52+
}
53+
54+
public QueryBuilder setWorkflowStatuses(Collection<WorkflowStatus> workflowStatuses) {
55+
if (workflowStatuses == null || workflowStatuses.isEmpty()) {
56+
return this;
57+
}
58+
59+
Collection<String> wfStatuses = Lists.newArrayListWithCapacity(workflowStatuses.size());
60+
for (WorkflowStatus workflowStatus : workflowStatuses) {
61+
switch (workflowStatus) {
62+
case OPEN:
63+
wfStatuses.add(CLOSE_TIME_PLACEHOLDER + " = " + MISSING_QUERY);
64+
break;
65+
case CLOSED:
66+
wfStatuses.add(CLOSE_TIME_PLACEHOLDER + " != " + MISSING_QUERY);
67+
break;
68+
default:
69+
wfStatuses.add(WORKFLOW_STATUS_PLACEHOLDER + '"' + workflowStatus + '"');
70+
break;
71+
}
72+
}
73+
74+
String query = String.join(OR_QUERY, wfStatuses);
75+
this.appendPartialQuery(query);
76+
return this;
77+
}
78+
79+
public QueryBuilder setWorkflowStartTime(TimeFilter timeFilter) {
80+
if (timeFilter == null || timeFilter.isEmpty()) {
81+
return this;
82+
}
83+
84+
Collection<String> timerFilters = Lists.newArrayListWithCapacity(2);
85+
if (timeFilter.getMinTimestamp() != null) {
86+
timerFilters.add(
87+
START_TIME_PLACEHOLDER + " >= " + toNanoSeconds(timeFilter.getMinTimestamp()));
88+
}
89+
90+
if (timeFilter.getMaxTimestamp() != null) {
91+
timerFilters.add(
92+
START_TIME_PLACEHOLDER + " <= " + toNanoSeconds(timeFilter.getMaxTimestamp()));
93+
}
94+
95+
String query = String.join(AND_QUERY, timerFilters);
96+
this.appendPartialQuery(query);
97+
return this;
98+
}
99+
100+
public String build() {
101+
return this.stringBuffer.toString();
102+
}
103+
104+
private static final String OR_QUERY = " or ";
105+
private static final String AND_QUERY = " and ";
106+
private static final String LEFT_PARENTHESES = "(";
107+
private static final String RIGHT_PARENTHESES = ")";
108+
private static final String MISSING_QUERY = "missing";
109+
private static final String WORKFLOW_TYPE_PLACEHOLDER = "WorkflowType = ";
110+
private static final String WORKFLOW_STATUS_PLACEHOLDER = "CloseStatus = ";
111+
private static final String START_TIME_PLACEHOLDER = "StartTime";
112+
private static final String CLOSE_TIME_PLACEHOLDER = "CloseTime";
113+
private static final long TIMESTAMP_SCALE = 1_000_000_000L;
114+
private StringBuffer stringBuffer;
115+
116+
private QueryBuilder() {
117+
this.stringBuffer = new StringBuffer();
118+
}
119+
120+
private void appendPartialQuery(String query) {
121+
if (query == null || query.length() == 0) {
122+
return;
123+
}
124+
125+
if (this.stringBuffer.length() != 0) {
126+
this.stringBuffer.append(AND_QUERY);
127+
}
128+
129+
this.stringBuffer.append(LEFT_PARENTHESES);
130+
this.stringBuffer.append(query);
131+
this.stringBuffer.append(RIGHT_PARENTHESES);
132+
}
133+
134+
protected static long toNanoSeconds(ZonedDateTime time) {
135+
return time.toEpochSecond() * TIMESTAMP_SCALE + time.getNano();
136+
}
137+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Modifications Copyright (c) 2017-2021 Uber Technologies Inc.
3+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
6+
* use this file except in compliance with the License. A copy of the License is
7+
* located at
8+
*
9+
* http://aws.amazon.com/apache2.0
10+
*
11+
* or in the "license" file accompanying this file. This file is distributed on
12+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
13+
* express or implied. See the License for the specific language governing
14+
* permissions and limitations under the License.
15+
*/
16+
package com.uber.cadence.internal.shadowing;
17+
18+
import com.uber.cadence.WorkflowExecution;
19+
import com.uber.cadence.activity.ActivityMethod;
20+
import com.uber.cadence.shadower.ReplayWorkflowActivityParams;
21+
import com.uber.cadence.shadower.ReplayWorkflowActivityResult;
22+
import com.uber.cadence.shadower.shadowerConstants;
23+
import com.uber.cadence.worker.WorkflowImplementationOptions;
24+
import com.uber.cadence.workflow.Functions;
25+
26+
public interface ReplayWorkflowActivity {
27+
@ActivityMethod(name = shadowerConstants.ReplayWorkflowActivityName)
28+
ReplayWorkflowActivityResult replay(ReplayWorkflowActivityParams params) throws Exception;
29+
30+
ReplayWorkflowActivityResult replayOneExecution(String domain, WorkflowExecution execution);
31+
32+
void registerWorkflowImplementationTypes(Class<?>... workflowImplementationClasses);
33+
34+
void registerWorkflowImplementationTypesWithOptions(
35+
WorkflowImplementationOptions options, Class<?>... workflowImplementationClasses);
36+
37+
<R> void addWorkflowImplementationFactory(Class<R> workflowInterface, Functions.Func<R> factory);
38+
39+
<R> void addWorkflowImplementationFactoryWithOptions(
40+
WorkflowImplementationOptions options, Class<R> workflowInterface, Functions.Func<R> factory);
41+
}

0 commit comments

Comments
 (0)