Skip to content

Commit db1dbca

Browse files
authored
Adding logs to internal shadower to check for failures log (#918)
* Added logs to scanworkflow * lint * random change because pipeline was blocked. test
1 parent 615c2c4 commit db1dbca

File tree

1 file changed

+30
-10
lines changed

1 file changed

+30
-10
lines changed

src/main/java/com/uber/cadence/internal/shadowing/ScanWorkflowActivityImpl.java

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,22 @@ public ScanWorkflowActivityImpl(IWorkflowService serviceClient) {
4242

4343
@Override
4444
public ScanWorkflowActivityResult scan(ScanWorkflowActivityParams params) throws Throwable {
45+
log.info("Starting scan with params: {} ", params);
46+
4547
ListWorkflowExecutionsRequest scanRequest =
4648
new ListWorkflowExecutionsRequest()
4749
.setDomain(params.getDomain())
4850
.setNextPageToken(params.getNextPageToken())
4951
.setPageSize(params.getPageSize())
5052
.setQuery(params.getWorkflowQuery());
53+
54+
log.debug("Created ListWorkflowExecutionsRequest: {} ", scanRequest);
55+
log.info("Scanning workflows with query: {}", params.getWorkflowQuery());
56+
5157
ListWorkflowExecutionsResponse resp = scanWorkflows(scanRequest);
5258

59+
log.info("Received response with {} executions", resp.getExecutions().size());
60+
5361
List<WorkflowExecution> executions =
5462
samplingWorkflows(resp.getExecutions(), params.getSamplingRate());
5563

@@ -60,18 +68,26 @@ public ScanWorkflowActivityResult scan(ScanWorkflowActivityParams params) throws
6068
.map(com.uber.cadence.internal.shadowing.WorkflowExecution::new)
6169
.collect(Collectors.toList()));
6270
result.setNextPageToken(resp.getNextPageToken());
71+
log.info("Scan completed with {} sampled executions", executions.size());
6372
return result;
6473
}
6574

6675
protected ListWorkflowExecutionsResponse scanWorkflows(ListWorkflowExecutionsRequest request)
6776
throws Throwable {
77+
log.info(
78+
"Scanning workflows for domain: {} with query: {}",
79+
request.getDomain(),
80+
request.getQuery());
6881
try {
69-
return RpcRetryer.retryWithResult(
70-
RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS,
71-
() -> this.serviceClient.ScanWorkflowExecutions(request));
82+
ListWorkflowExecutionsResponse response =
83+
RpcRetryer.retryWithResult(
84+
RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS,
85+
() -> this.serviceClient.ScanWorkflowExecutions(request));
86+
log.info("Successfully scanned workflows for domain: {}", request.getDomain());
87+
return response;
7288
} catch (BadRequestError | EntityNotExistsError | ClientVersionNotSupportedError e) {
7389
log.error(
74-
"failed to scan workflow records with non-retryable error. domain: "
90+
"failed to scan workflow records with non-retryable error. Domain: "
7591
+ request.getDomain()
7692
+ "; query: "
7793
+ request.getQuery(),
@@ -90,13 +106,17 @@ protected ListWorkflowExecutionsResponse scanWorkflows(ListWorkflowExecutionsReq
90106

91107
protected List<WorkflowExecution> samplingWorkflows(
92108
List<WorkflowExecutionInfo> executionInfoList, double samplingRate) {
109+
log.info("Sampling workflows with rate: {}", samplingRate);
93110
int capacity = (int) (executionInfoList.size() * samplingRate);
94111
capacity = Math.max(capacity, 1);
95-
return executionInfoList
96-
.stream()
97-
.unordered()
98-
.map((executionInfo -> executionInfo.getExecution()))
99-
.limit((long) (capacity))
100-
.collect(Collectors.toList());
112+
List<WorkflowExecution> sampledExecutions =
113+
executionInfoList
114+
.stream()
115+
.unordered()
116+
.map((executionInfo -> executionInfo.getExecution()))
117+
.limit((long) (capacity))
118+
.collect(Collectors.toList());
119+
log.info("Sampled {} workflows out of {}", sampledExecutions.size(), executionInfoList.size());
120+
return sampledExecutions;
101121
}
102122
}

0 commit comments

Comments
 (0)