Skip to content

Commit ed2b8cc

Browse files
Add an interceptor for listExecutions (#2524)
Add an interceptor for listExecutions
1 parent cea73d3 commit ed2b8cc

10 files changed

+153
-34
lines changed

temporal-sdk/src/main/java/io/temporal/client/GetWorkflowExecutionHistoryIterator.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.temporal.api.history.v1.HistoryEvent;
66
import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryRequest;
77
import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryResponse;
8+
import io.temporal.internal.client.EagerPaginator;
89
import io.temporal.internal.client.external.GenericWorkflowClient;
910
import java.util.List;
1011
import java.util.concurrent.CompletableFuture;
@@ -30,7 +31,7 @@ public GetWorkflowExecutionHistoryIterator(
3031
}
3132

3233
@Override
33-
CompletableFuture<GetWorkflowExecutionHistoryResponse> performRequest(
34+
protected CompletableFuture<GetWorkflowExecutionHistoryResponse> performRequest(
3435
@Nonnull ByteString nextPageToken) {
3536
GetWorkflowExecutionHistoryRequest.Builder requestBuilder =
3637
GetWorkflowExecutionHistoryRequest.newBuilder()
@@ -46,12 +47,12 @@ CompletableFuture<GetWorkflowExecutionHistoryResponse> performRequest(
4647
}
4748

4849
@Override
49-
ByteString getNextPageToken(GetWorkflowExecutionHistoryResponse response) {
50+
protected ByteString getNextPageToken(GetWorkflowExecutionHistoryResponse response) {
5051
return response.getNextPageToken();
5152
}
5253

5354
@Override
54-
List<HistoryEvent> toElements(GetWorkflowExecutionHistoryResponse response) {
55+
protected List<HistoryEvent> toElements(GetWorkflowExecutionHistoryResponse response) {
5556
return response.getHistory().getEventsList();
5657
}
5758
}

temporal-sdk/src/main/java/io/temporal/client/ListScheduleListDescriptionIterator.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import io.temporal.api.schedule.v1.ScheduleListEntry;
55
import io.temporal.api.workflowservice.v1.ListSchedulesRequest;
66
import io.temporal.api.workflowservice.v1.ListSchedulesResponse;
7+
import io.temporal.internal.client.EagerPaginator;
78
import io.temporal.internal.client.external.GenericWorkflowClient;
89
import java.util.List;
910
import java.util.concurrent.CompletableFuture;
@@ -40,7 +41,8 @@ public ListScheduleListDescriptionIterator(
4041
}
4142

4243
@Override
43-
CompletableFuture<ListSchedulesResponse> performRequest(@Nonnull ByteString nextPageToken) {
44+
protected CompletableFuture<ListSchedulesResponse> performRequest(
45+
@Nonnull ByteString nextPageToken) {
4446
ListSchedulesRequest.Builder request =
4547
ListSchedulesRequest.newBuilder().setNamespace(namespace).setNextPageToken(nextPageToken);
4648

@@ -54,12 +56,12 @@ CompletableFuture<ListSchedulesResponse> performRequest(@Nonnull ByteString next
5456
}
5557

5658
@Override
57-
ByteString getNextPageToken(ListSchedulesResponse response) {
59+
protected ByteString getNextPageToken(ListSchedulesResponse response) {
5860
return response.getNextPageToken();
5961
}
6062

6163
@Override
62-
List<ScheduleListEntry> toElements(ListSchedulesResponse response) {
64+
protected List<ScheduleListEntry> toElements(ListSchedulesResponse response) {
6365
return response.getSchedulesList();
6466
}
6567
}

temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
import com.google.common.base.Preconditions;
66
import com.google.common.base.Strings;
7-
import com.google.common.collect.Iterators;
87
import com.google.common.reflect.TypeToken;
98
import com.uber.m3.tally.Scope;
109
import io.temporal.api.common.v1.WorkflowExecution;
@@ -250,22 +249,10 @@ public WorkflowExecutionCount countWorkflows(@Nullable String query) {
250249

251250
Stream<WorkflowExecutionMetadata> listExecutions(
252251
@Nullable String query, @Nullable Integer pageSize) {
253-
ListWorkflowExecutionIterator iterator =
254-
new ListWorkflowExecutionIterator(query, options.getNamespace(), pageSize, genericClient);
255-
iterator.init();
256-
Iterator<WorkflowExecutionMetadata> wrappedIterator =
257-
Iterators.transform(
258-
iterator, info -> new WorkflowExecutionMetadata(info, options.getDataConverter()));
259-
260-
// IMMUTABLE here means that "interference" (in Java Streams terms) to this spliterator is
261-
// impossible
262-
// TODO We don't add DISTINCT to be safe. It's not explicitly stated if Temporal Server list
263-
// API
264-
// guarantees absence of duplicates
265-
final int CHARACTERISTICS = Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.IMMUTABLE;
266-
267-
return StreamSupport.stream(
268-
Spliterators.spliteratorUnknownSize(wrappedIterator, CHARACTERISTICS), false);
252+
return workflowClientCallsInvoker
253+
.listWorkflowExecutions(
254+
new WorkflowClientCallsInterceptor.ListWorkflowExecutionsInput(query, pageSize))
255+
.getStream();
269256
}
270257

271258
@Override

temporal-sdk/src/main/java/io/temporal/client/WorkflowExecutionMetadata.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@
2020
import javax.annotation.Nonnull;
2121
import javax.annotation.Nullable;
2222

23+
/** WorkflowExecutionMetadata contains information about a workflow execution. */
2324
public class WorkflowExecutionMetadata {
2425
private final @Nonnull WorkflowExecutionInfo info;
2526
private final @Nonnull DataConverter dataConverter;
2627

27-
WorkflowExecutionMetadata(
28+
public WorkflowExecutionMetadata(
2829
@Nonnull WorkflowExecutionInfo info, @Nonnull DataConverter dataConverter) {
2930
this.info = Preconditions.checkNotNull(info, "info");
3031
this.dataConverter = Preconditions.checkNotNull(dataConverter, "dataConverter");

temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.util.concurrent.CompletableFuture;
1111
import java.util.concurrent.TimeUnit;
1212
import java.util.concurrent.TimeoutException;
13+
import java.util.stream.Stream;
1314
import javax.annotation.Nonnull;
1415
import javax.annotation.Nullable;
1516

@@ -75,6 +76,40 @@ public interface WorkflowClientCallsInterceptor {
7576

7677
DescribeWorkflowOutput describe(DescribeWorkflowInput input);
7778

79+
ListWorkflowExecutionsOutput listWorkflowExecutions(ListWorkflowExecutionsInput input);
80+
81+
final class ListWorkflowExecutionsInput {
82+
private final String query;
83+
private final Integer pageSize;
84+
85+
public ListWorkflowExecutionsInput(@Nullable String query, @Nullable Integer pageSize) {
86+
this.query = query;
87+
this.pageSize = pageSize;
88+
}
89+
90+
@Nullable
91+
public String getQuery() {
92+
return query;
93+
}
94+
95+
@Nullable
96+
public Integer getPageSize() {
97+
return pageSize;
98+
}
99+
}
100+
101+
final class ListWorkflowExecutionsOutput {
102+
private final Stream<WorkflowExecutionMetadata> stream;
103+
104+
public ListWorkflowExecutionsOutput(Stream<WorkflowExecutionMetadata> stream) {
105+
this.stream = stream;
106+
}
107+
108+
public Stream<WorkflowExecutionMetadata> getStream() {
109+
return stream;
110+
}
111+
}
112+
78113
CountWorkflowOutput countWorkflows(CountWorkflowsInput input);
79114

80115
final class WorkflowStartInput {

temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptorBase.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ public DescribeWorkflowOutput describe(DescribeWorkflowInput input) {
7373
return next.describe(input);
7474
}
7575

76+
@Override
77+
public ListWorkflowExecutionsOutput listWorkflowExecutions(ListWorkflowExecutionsInput input) {
78+
return next.listWorkflowExecutions(input);
79+
}
80+
7681
@Override
7782
public CountWorkflowOutput countWorkflows(CountWorkflowsInput input) {
7883
return next.countWorkflows(input);

temporal-sdk/src/main/java/io/temporal/client/EagerPaginator.java renamed to temporal-sdk/src/main/java/io/temporal/internal/client/EagerPaginator.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.temporal.client;
1+
package io.temporal.internal.client;
22

33
import com.google.protobuf.ByteString;
44
import java.util.Iterator;
@@ -15,7 +15,7 @@
1515
* previous page. The main goal of this approach is to reduce a synchronous wait that would
1616
* otherwise happen when a first element of the next page is requested.
1717
*/
18-
abstract class EagerPaginator<Resp, T> implements Iterator<T> {
18+
public abstract class EagerPaginator<Resp, T> implements Iterator<T> {
1919
private List<T> activeResponse;
2020
private int nextActiveResponseIndex;
2121
private CompletableFuture<Resp> nextResponse;
@@ -92,9 +92,9 @@ private Resp waitAndGetNextResponse() {
9292
return response;
9393
}
9494

95-
abstract CompletableFuture<Resp> performRequest(@Nonnull ByteString nextPageToken);
95+
protected abstract CompletableFuture<Resp> performRequest(@Nonnull ByteString nextPageToken);
9696

97-
abstract ByteString getNextPageToken(Resp response);
97+
protected abstract ByteString getNextPageToken(Resp response);
9898

99-
abstract List<T> toElements(Resp response);
99+
protected abstract List<T> toElements(Resp response);
100100
}

temporal-sdk/src/main/java/io/temporal/client/ListWorkflowExecutionIterator.java renamed to temporal-sdk/src/main/java/io/temporal/internal/client/ListWorkflowExecutionIterator.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.temporal.client;
1+
package io.temporal.internal.client;
22

33
import com.google.protobuf.ByteString;
44
import io.temporal.api.workflow.v1.WorkflowExecutionInfo;
@@ -18,7 +18,7 @@ class ListWorkflowExecutionIterator
1818
private final @Nullable Integer pageSize;
1919
private final @Nonnull GenericWorkflowClient genericClient;
2020

21-
ListWorkflowExecutionIterator(
21+
public ListWorkflowExecutionIterator(
2222
@Nullable String query,
2323
@Nonnull String namespace,
2424
@Nullable Integer pageSize,
@@ -30,7 +30,7 @@ class ListWorkflowExecutionIterator
3030
}
3131

3232
@Override
33-
CompletableFuture<ListWorkflowExecutionsResponse> performRequest(
33+
protected CompletableFuture<ListWorkflowExecutionsResponse> performRequest(
3434
@Nonnull ByteString nextPageToken) {
3535
ListWorkflowExecutionsRequest.Builder request =
3636
ListWorkflowExecutionsRequest.newBuilder()
@@ -49,12 +49,12 @@ CompletableFuture<ListWorkflowExecutionsResponse> performRequest(
4949
}
5050

5151
@Override
52-
ByteString getNextPageToken(ListWorkflowExecutionsResponse response) {
52+
protected ByteString getNextPageToken(ListWorkflowExecutionsResponse response) {
5353
return response.getNextPageToken();
5454
}
5555

5656
@Override
57-
List<WorkflowExecutionInfo> toElements(ListWorkflowExecutionsResponse response) {
57+
protected List<WorkflowExecutionInfo> toElements(ListWorkflowExecutionsResponse response) {
5858
return response.getExecutionsList();
5959
}
6060
}

temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import static io.temporal.internal.common.HeaderUtils.intoPayloadMap;
66
import static io.temporal.internal.common.WorkflowExecutionUtils.makeUserMetaData;
77

8+
import com.google.common.collect.Iterators;
89
import io.grpc.Deadline;
910
import io.grpc.Status;
1011
import io.grpc.StatusRuntimeException;
@@ -30,6 +31,7 @@
3031
import java.util.concurrent.CompletableFuture;
3132
import java.util.concurrent.TimeUnit;
3233
import java.util.concurrent.TimeoutException;
34+
import java.util.stream.StreamSupport;
3335
import javax.annotation.Nullable;
3436
import org.slf4j.Logger;
3537
import org.slf4j.LoggerFactory;
@@ -690,6 +692,29 @@ public CountWorkflowOutput countWorkflows(CountWorkflowsInput input) {
690692
return new CountWorkflowOutput(new WorkflowExecutionCount(resp));
691693
}
692694

695+
@Override
696+
public ListWorkflowExecutionsOutput listWorkflowExecutions(ListWorkflowExecutionsInput input) {
697+
ListWorkflowExecutionIterator iterator =
698+
new ListWorkflowExecutionIterator(
699+
input.getQuery(), clientOptions.getNamespace(), input.getPageSize(), genericClient);
700+
iterator.init();
701+
Iterator<WorkflowExecutionMetadata> wrappedIterator =
702+
Iterators.transform(
703+
iterator,
704+
info -> new WorkflowExecutionMetadata(info, clientOptions.getDataConverter()));
705+
706+
// IMMUTABLE here means that "interference" (in Java Streams terms) to this spliterator is
707+
// impossible
708+
// TODO We don't add DISTINCT to be safe. It's not explicitly stated if Temporal Server list
709+
// API
710+
// guarantees absence of duplicates
711+
final int CHARACTERISTICS = Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.IMMUTABLE;
712+
713+
return new ListWorkflowExecutionsOutput(
714+
StreamSupport.stream(
715+
Spliterators.spliteratorUnknownSize(wrappedIterator, CHARACTERISTICS), false));
716+
}
717+
693718
private static <R> R convertResultPayloads(
694719
Optional<Payloads> resultValue,
695720
Class<R> resultClass,
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package io.temporal.client;
2+
3+
import static org.junit.Assert.assertEquals;
4+
import static org.junit.Assert.assertFalse;
5+
import static org.junit.Assume.assumeTrue;
6+
7+
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
8+
import io.temporal.common.interceptors.WorkflowClientCallsInterceptorBase;
9+
import io.temporal.common.interceptors.WorkflowClientInterceptorBase;
10+
import io.temporal.testing.internal.SDKTestWorkflowRule;
11+
import io.temporal.workflow.shared.TestWorkflows;
12+
import java.util.concurrent.atomic.AtomicInteger;
13+
import org.junit.Rule;
14+
import org.junit.Test;
15+
16+
public class ListWorkflowExecutionsInterceptorTest {
17+
@Rule
18+
public SDKTestWorkflowRule testWorkflowRule =
19+
SDKTestWorkflowRule.newBuilder()
20+
.setWorkflowTypes(TestWorkflows.DoNothingNoArgsWorkflow.class)
21+
.build();
22+
23+
@Test
24+
public void listExecutions_isIntercepted() throws InterruptedException {
25+
assumeTrue(
26+
"Test Server doesn't support listWorkflowExecutions endpoint yet",
27+
SDKTestWorkflowRule.useExternalService);
28+
29+
AtomicInteger intercepted = new AtomicInteger();
30+
WorkflowClient workflowClient =
31+
WorkflowClient.newInstance(
32+
testWorkflowRule.getWorkflowServiceStubs(),
33+
WorkflowClientOptions.newBuilder(testWorkflowRule.getWorkflowClient().getOptions())
34+
.setInterceptors(
35+
new WorkflowClientInterceptorBase() {
36+
@Override
37+
public WorkflowClientCallsInterceptor workflowClientCallsInterceptor(
38+
WorkflowClientCallsInterceptor next) {
39+
return new WorkflowClientCallsInterceptorBase(next) {
40+
@Override
41+
public ListWorkflowExecutionsOutput listWorkflowExecutions(
42+
ListWorkflowExecutionsInput input) {
43+
intercepted.incrementAndGet();
44+
return super.listWorkflowExecutions(input);
45+
}
46+
};
47+
}
48+
})
49+
.validateAndBuildWithDefaults());
50+
51+
WorkflowStub.fromTyped(testWorkflowRule.newWorkflowStub(TestWorkflows.NoArgsWorkflow.class))
52+
.start();
53+
54+
// Visibility API is eventually consistent
55+
Thread.sleep(2_000);
56+
java.util.List<WorkflowExecutionMetadata> result =
57+
workflowClient
58+
.listExecutions("TaskQueue='" + testWorkflowRule.getTaskQueue() + "'")
59+
.collect(java.util.stream.Collectors.toList());
60+
assertFalse(result.isEmpty());
61+
assertEquals(1, intercepted.get());
62+
}
63+
}

0 commit comments

Comments
 (0)