Skip to content

Commit 6655ddc

Browse files
authored
Fix encoding of shadowing activities (#600)
* Fix encoding of shadowing activities
1 parent d8b08b0 commit 6655ddc

15 files changed

+465
-109
lines changed

src/main/java/com/uber/cadence/internal/shadowing/ReplayWorkflowActivity.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,7 @@
1515
*/
1616
package com.uber.cadence.internal.shadowing;
1717

18-
import com.uber.cadence.WorkflowExecution;
1918
import com.uber.cadence.activity.ActivityMethod;
20-
import com.uber.cadence.shadower.ReplayWorkflowActivityParams;
21-
import com.uber.cadence.shadower.ReplayWorkflowActivityResult;
2219
import com.uber.cadence.shadower.shadowerConstants;
2320
import com.uber.cadence.worker.WorkflowImplementationOptions;
2421
import com.uber.cadence.workflow.Functions;

src/main/java/com/uber/cadence/internal/shadowing/ReplayWorkflowActivityImpl.java

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,13 @@
2222
import com.uber.cadence.History;
2323
import com.uber.cadence.HistoryEvent;
2424
import com.uber.cadence.HistoryEventFilterType;
25-
import com.uber.cadence.WorkflowExecution;
2625
import com.uber.cadence.activity.Activity;
2726
import com.uber.cadence.common.WorkflowExecutionHistory;
2827
import com.uber.cadence.internal.common.InternalUtils;
2928
import com.uber.cadence.internal.common.RpcRetryer;
3029
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
3130
import com.uber.cadence.internal.metrics.MetricsType;
3231
import com.uber.cadence.serviceclient.IWorkflowService;
33-
import com.uber.cadence.shadower.ReplayWorkflowActivityParams;
34-
import com.uber.cadence.shadower.ReplayWorkflowActivityResult;
3532
import com.uber.cadence.testing.TestWorkflowEnvironment;
3633
import com.uber.cadence.worker.Worker;
3734
import com.uber.cadence.worker.WorkflowImplementationOptions;
@@ -99,8 +96,9 @@ public ReplayWorkflowActivityResult replay(ReplayWorkflowActivityParams request)
9996

10097
// Retrieve process from heartbeat
10198
Optional<HeartbeatDetail> heartbeatDetail = Activity.getHeartbeatDetails(HeartbeatDetail.class);
99+
ReplayWorkflowActivityResult heartbeatResult;
102100
if (heartbeatDetail.isPresent()) {
103-
ReplayWorkflowActivityResult heartbeatResult = heartbeatDetail.get().getReplayResult();
101+
heartbeatResult = heartbeatDetail.get().getReplayResult();
104102
successCount = heartbeatResult.getSucceeded();
105103
failedCount = heartbeatResult.getFailed();
106104
skippedCount = heartbeatResult.getSkipped();
@@ -114,21 +112,22 @@ public ReplayWorkflowActivityResult replay(ReplayWorkflowActivityParams request)
114112
successCount += oneReplayResult.getSucceeded();
115113
failedCount += oneReplayResult.getFailed();
116114
skippedCount += oneReplayResult.getSkipped();
117-
ReplayWorkflowActivityResult heartbeatResult =
118-
new ReplayWorkflowActivityResult()
119-
.setSucceeded(successCount)
120-
.setFailed(failedCount)
121-
.setSkipped(skippedCount);
115+
heartbeatResult = new ReplayWorkflowActivityResult();
116+
heartbeatResult.setSucceeded(successCount);
117+
heartbeatResult.setFailed(failedCount);
118+
heartbeatResult.setSkipped(skippedCount);
122119
Activity.heartbeat(new HeartbeatDetail(heartbeatResult, replayIndex));
123120
}
124-
return new ReplayWorkflowActivityResult()
125-
.setSucceeded(successCount)
126-
.setFailed(failedCount)
127-
.setSkipped(skippedCount);
121+
ReplayWorkflowActivityResult result = new ReplayWorkflowActivityResult();
122+
result.setSucceeded(successCount);
123+
result.setFailed(failedCount);
124+
result.setSkipped(skippedCount);
125+
return result;
128126
}
129127

130128
public ReplayWorkflowActivityResult replayOneExecution(
131129
String domain, WorkflowExecution execution) {
130+
ReplayWorkflowActivityResult result = new ReplayWorkflowActivityResult();
132131
WorkflowExecutionHistory workflowHistory;
133132
try {
134133
workflowHistory = getFullHistory(domain, execution);
@@ -139,23 +138,27 @@ public ReplayWorkflowActivityResult replayOneExecution(
139138
+ ". Execution: "
140139
+ execution.toString(),
141140
e);
142-
return new ReplayWorkflowActivityResult().setSkipped(1);
141+
result.setSkipped(1);
142+
return result;
143143
}
144144

145145
try {
146146
boolean isSuccess = replayWorkflowHistory(domain, execution, workflowHistory);
147147
if (isSuccess) {
148148
this.metricsScope.counter(MetricsType.REPLAY_SUCCESS_COUNTER).inc(1);
149-
return new ReplayWorkflowActivityResult().setSucceeded(1);
149+
result.setSucceeded(1);
150+
return result;
150151
} else {
151152
this.metricsScope.counter(MetricsType.REPLAY_SKIPPED_COUNTER).inc(1);
152-
return new ReplayWorkflowActivityResult().setSkipped(1);
153+
result.setSkipped(1);
154+
return result;
153155
}
154156
} catch (NonRetryableException e) {
155157
throw e;
156158
} catch (Exception e) {
157159
this.metricsScope.counter(MetricsType.REPLAY_FAILED_COUNTER).inc(1);
158-
return new ReplayWorkflowActivityResult().setFailed(1);
160+
result.setFailed(1);
161+
return result;
159162
}
160163
}
161164

@@ -170,7 +173,7 @@ protected WorkflowExecutionHistory getFullHistory(String domain, WorkflowExecuti
170173
RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS,
171174
() ->
172175
WorkflowExecutionUtils.getHistoryPage(
173-
nextPageToken, this.serviceClient, domain, execution));
176+
nextPageToken, this.serviceClient, domain, execution.toThrift()));
174177
pageToken = resp.getNextPageToken();
175178

176179
// handle raw history
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Modifications Copyright (c) 2017-2021 Uber Technologies Inc.
3+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
6+
* use this file except in compliance with the License. A copy of the License is
7+
* located at
8+
*
9+
* http://aws.amazon.com/apache2.0
10+
*
11+
* or in the "license" file accompanying this file. This file is distributed on
12+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
13+
* express or implied. See the License for the specific language governing
14+
* permissions and limitations under the License.
15+
*/
16+
package com.uber.cadence.internal.shadowing;
17+
18+
import java.util.List;
19+
20+
/**
21+
* This class is the JSON serializable class of {@link
22+
* com.uber.cadence.shadower.ReplayWorkflowActivityParams} Make sure this class is sync with auto
23+
* generated ReplayWorkflowActivityParams
24+
*/
25+
public class ReplayWorkflowActivityParams {
26+
private String domain;
27+
private List<WorkflowExecution> executions;
28+
29+
public ReplayWorkflowActivityParams() {}
30+
31+
public String getDomain() {
32+
return domain;
33+
}
34+
35+
public void setDomain(String domain) {
36+
this.domain = domain;
37+
}
38+
39+
public List<WorkflowExecution> getExecutions() {
40+
return executions;
41+
}
42+
43+
public void setExecutions(List<WorkflowExecution> executions) {
44+
this.executions = executions;
45+
}
46+
47+
@Override
48+
public String toString() {
49+
return "ReplayWorkflowActivityParams{"
50+
+ "domain='"
51+
+ domain
52+
+ '\''
53+
+ ", executions="
54+
+ executions
55+
+ '}';
56+
}
57+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Modifications Copyright (c) 2017-2021 Uber Technologies Inc.
3+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
6+
* use this file except in compliance with the License. A copy of the License is
7+
* located at
8+
*
9+
* http://aws.amazon.com/apache2.0
10+
*
11+
* or in the "license" file accompanying this file. This file is distributed on
12+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
13+
* express or implied. See the License for the specific language governing
14+
* permissions and limitations under the License.
15+
*/
16+
package com.uber.cadence.internal.shadowing;
17+
18+
/**
19+
* This class is the JSON serializable class of {@link
20+
* com.uber.cadence.shadower.ReplayWorkflowActivityResult} Make sure this class is sync with auto
21+
* generated ReplayWorkflowActivityResult
22+
*/
23+
public class ReplayWorkflowActivityResult {
24+
private int succeeded;
25+
private int skipped;
26+
private int failed;
27+
28+
public ReplayWorkflowActivityResult() {}
29+
30+
public int getSucceeded() {
31+
return succeeded;
32+
}
33+
34+
public void setSucceeded(int succeeded) {
35+
this.succeeded = succeeded;
36+
}
37+
38+
public int getSkipped() {
39+
return skipped;
40+
}
41+
42+
public void setSkipped(int skipped) {
43+
this.skipped = skipped;
44+
}
45+
46+
public int getFailed() {
47+
return failed;
48+
}
49+
50+
public void setFailed(int failed) {
51+
this.failed = failed;
52+
}
53+
54+
@Override
55+
public String toString() {
56+
return "ReplayWorkflowActivityResult{"
57+
+ "succeeded="
58+
+ succeeded
59+
+ ", skipped="
60+
+ skipped
61+
+ ", failed="
62+
+ failed
63+
+ '}';
64+
}
65+
}

src/main/java/com/uber/cadence/internal/shadowing/ScanWorkflowActivity.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
import static com.uber.cadence.shadower.shadowerConstants.ScanWorkflowActivityName;
1919

2020
import com.uber.cadence.activity.ActivityMethod;
21-
import com.uber.cadence.shadower.ScanWorkflowActivityParams;
22-
import com.uber.cadence.shadower.ScanWorkflowActivityResult;
2321

2422
public interface ScanWorkflowActivity {
2523
@ActivityMethod(name = ScanWorkflowActivityName)

src/main/java/com/uber/cadence/internal/shadowing/ScanWorkflowActivityImpl.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@
2424
import com.uber.cadence.WorkflowExecutionInfo;
2525
import com.uber.cadence.internal.common.RpcRetryer;
2626
import com.uber.cadence.serviceclient.IWorkflowService;
27-
import com.uber.cadence.shadower.ScanWorkflowActivityParams;
28-
import com.uber.cadence.shadower.ScanWorkflowActivityResult;
2927
import java.util.List;
3028
import java.util.Objects;
3129
import java.util.stream.Collectors;
@@ -55,9 +53,14 @@ public ScanWorkflowActivityResult scan(ScanWorkflowActivityParams params) throws
5553
List<WorkflowExecution> executions =
5654
samplingWorkflows(resp.getExecutions(), params.getSamplingRate());
5755

58-
return new ScanWorkflowActivityResult()
59-
.setExecutions(executions)
60-
.setNextPageToken(resp.getNextPageToken());
56+
ScanWorkflowActivityResult result = new ScanWorkflowActivityResult();
57+
result.setExecutions(
58+
executions
59+
.stream()
60+
.map(com.uber.cadence.internal.shadowing.WorkflowExecution::new)
61+
.collect(Collectors.toList()));
62+
result.setNextPageToken(resp.getNextPageToken());
63+
return result;
6164
}
6265

6366
protected ListWorkflowExecutionsResponse scanWorkflows(ListWorkflowExecutionsRequest request)
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Modifications Copyright (c) 2017-2021 Uber Technologies Inc.
3+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
6+
* use this file except in compliance with the License. A copy of the License is
7+
* located at
8+
*
9+
* http://aws.amazon.com/apache2.0
10+
*
11+
* or in the "license" file accompanying this file. This file is distributed on
12+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
13+
* express or implied. See the License for the specific language governing
14+
* permissions and limitations under the License.
15+
*/
16+
package com.uber.cadence.internal.shadowing;
17+
18+
import java.util.Arrays;
19+
20+
/**
21+
* This class is the JSON serializable class of {@link
22+
* com.uber.cadence.shadower.ScanWorkflowActivityParams} Make sure this class is sync with auto
23+
* generated ScanWorkflowActivityParams
24+
*/
25+
public class ScanWorkflowActivityParams {
26+
private String domain;
27+
private String workflowQuery;
28+
private double samplingRate;
29+
private int pageSize;
30+
private byte[] nextPageToken;
31+
32+
public ScanWorkflowActivityParams() {}
33+
34+
public String getDomain() {
35+
return domain;
36+
}
37+
38+
public void setDomain(String domain) {
39+
this.domain = domain;
40+
}
41+
42+
public String getWorkflowQuery() {
43+
return workflowQuery;
44+
}
45+
46+
public void setWorkflowQuery(String workflowQuery) {
47+
this.workflowQuery = workflowQuery;
48+
}
49+
50+
public double getSamplingRate() {
51+
return samplingRate;
52+
}
53+
54+
public void setSamplingRate(double samplingRate) {
55+
this.samplingRate = samplingRate;
56+
}
57+
58+
public int getPageSize() {
59+
return pageSize;
60+
}
61+
62+
public void setPageSize(int pageSize) {
63+
this.pageSize = pageSize;
64+
}
65+
66+
public byte[] getNextPageToken() {
67+
return nextPageToken;
68+
}
69+
70+
public void setNextPageToken(byte[] nextPageToken) {
71+
this.nextPageToken = nextPageToken;
72+
}
73+
74+
@Override
75+
public String toString() {
76+
return "ScanWorkflowActivityParams{"
77+
+ "domain='"
78+
+ domain
79+
+ '\''
80+
+ ", workflowQuery='"
81+
+ workflowQuery
82+
+ '\''
83+
+ ", samplingRate="
84+
+ samplingRate
85+
+ ", pageSize="
86+
+ pageSize
87+
+ ", nextPageToken="
88+
+ Arrays.toString(nextPageToken)
89+
+ '}';
90+
}
91+
}

0 commit comments

Comments
 (0)