Skip to content

Commit 4871168

Browse files
authored
Test server Nexus endpoint operator apis (#2162)
* Bump API version to v1.36.0 * Nexus endpoint test server CRUD API implementation * cleanup * functional tests * test operator service external setup * test environment setup * test environment setup * skip functional tests with external server
1 parent 5d22bb5 commit 4871168

File tree

8 files changed

+732
-3
lines changed

8 files changed

+732
-3
lines changed
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.internal.testservice;
22+
23+
import io.temporal.api.nexus.v1.Endpoint;
24+
import io.temporal.api.nexus.v1.EndpointSpec;
25+
import java.io.Closeable;
26+
import java.util.List;
27+
28+
public interface TestNexusEndpointStore extends Closeable {
29+
30+
Endpoint createEndpoint(EndpointSpec spec);
31+
32+
Endpoint updateEndpoint(String id, long version, EndpointSpec spec);
33+
34+
void deleteEndpoint(String id, long version);
35+
36+
Endpoint getEndpoint(String id);
37+
38+
List<Endpoint> listEndpoints(long pageSize, byte[] nextPageToken, String name);
39+
40+
void validateEndpointSpec(EndpointSpec spec);
41+
42+
@Override
43+
void close();
44+
}
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.internal.testservice;
22+
23+
import io.grpc.Status;
24+
import io.temporal.api.nexus.v1.Endpoint;
25+
import io.temporal.api.nexus.v1.EndpointSpec;
26+
import java.util.*;
27+
import java.util.concurrent.ConcurrentSkipListMap;
28+
import java.util.regex.Pattern;
29+
import java.util.stream.Collectors;
30+
31+
/**
32+
* TestNexusEndpointStoreImpl is an in-memory implementation of Nexus endpoint CRUD operations for
33+
* use with the test server. Because conflict resolution is not required, there is no handling for
34+
* created or updated timestamps.
35+
*/
36+
public class TestNexusEndpointStoreImpl implements TestNexusEndpointStore {
37+
38+
private static final Pattern ENDPOINT_NAME_REGEX = Pattern.compile("^[a-zA-Z_][a-zA-Z0-9_]*$");
39+
40+
private final SortedMap<String, Endpoint> endpoints = new ConcurrentSkipListMap<>();
41+
private final Set<String> endpointNames = new HashSet<>();
42+
43+
@Override
44+
public Endpoint createEndpoint(EndpointSpec spec) {
45+
validateEndpointSpec(spec);
46+
47+
if (!endpointNames.add(spec.getName())) {
48+
throw Status.ALREADY_EXISTS
49+
.withDescription("Nexus endpoint already registered with name: " + spec.getName())
50+
.asRuntimeException();
51+
}
52+
53+
String id = UUID.randomUUID().toString();
54+
Endpoint endpoint = Endpoint.newBuilder().setId(id).setVersion(1).setSpec(spec).build();
55+
56+
if (endpoints.putIfAbsent(id, endpoint) != null) {
57+
// This should never happen in practice
58+
throw Status.ALREADY_EXISTS
59+
.withDescription("Nexus endpoint already exists with ID: " + id)
60+
.asRuntimeException();
61+
}
62+
63+
return endpoint;
64+
}
65+
66+
@Override
67+
public Endpoint updateEndpoint(String id, long version, EndpointSpec spec) {
68+
validateEndpointSpec(spec);
69+
70+
Endpoint prev = endpoints.get(id);
71+
72+
if (prev == null) {
73+
throw Status.NOT_FOUND
74+
.withDescription("Could not find Nexus endpoint with ID: " + id)
75+
.asRuntimeException();
76+
}
77+
78+
if (prev.getVersion() != version) {
79+
throw Status.INVALID_ARGUMENT
80+
.withDescription(
81+
"Error updating Nexus endpoint: version mismatch."
82+
+ " Expected: "
83+
+ prev.getVersion()
84+
+ " Received: "
85+
+ version)
86+
.asRuntimeException();
87+
}
88+
89+
if (!prev.getSpec().getName().equals(spec.getName()) && !endpointNames.add(spec.getName())) {
90+
throw Status.ALREADY_EXISTS
91+
.withDescription(
92+
"Error updating Nexus endpoint: "
93+
+ "endpoint already registered with updated name: "
94+
+ spec.getName())
95+
.asRuntimeException();
96+
} else {
97+
endpointNames.remove(prev.getSpec().getName());
98+
}
99+
100+
Endpoint updated = Endpoint.newBuilder(prev).setVersion(version + 1).setSpec(spec).build();
101+
102+
endpoints.put(id, updated);
103+
return updated;
104+
}
105+
106+
@Override
107+
public void deleteEndpoint(String id, long version) {
108+
Endpoint existing = endpoints.get(id);
109+
110+
if (existing == null) {
111+
throw Status.NOT_FOUND
112+
.withDescription("Could not find Nexus endpoint with ID: " + id)
113+
.asRuntimeException();
114+
}
115+
116+
if (existing.getVersion() != version) {
117+
throw Status.INVALID_ARGUMENT
118+
.withDescription(
119+
"Error deleting Nexus endpoint: version mismatch."
120+
+ " Expected "
121+
+ existing.getVersion()
122+
+ " Received: "
123+
+ version)
124+
.asRuntimeException();
125+
}
126+
127+
endpoints.remove(id);
128+
}
129+
130+
@Override
131+
public Endpoint getEndpoint(String id) {
132+
Endpoint endpoint = endpoints.get(id);
133+
if (endpoint == null) {
134+
throw Status.NOT_FOUND
135+
.withDescription("Could not find Nexus endpoint with ID: " + id)
136+
.asRuntimeException();
137+
}
138+
return endpoint;
139+
}
140+
141+
@Override
142+
public List<Endpoint> listEndpoints(long pageSize, byte[] nextPageToken, String name) {
143+
if (name != null && !name.isEmpty()) {
144+
return endpoints.values().stream()
145+
.filter(ep -> ep.getSpec().getName().equals(name))
146+
.limit(1)
147+
.collect(Collectors.toList());
148+
}
149+
150+
if (nextPageToken.length > 0) {
151+
return endpoints.tailMap(new String(nextPageToken)).values().stream()
152+
.skip(1)
153+
.limit(pageSize)
154+
.collect(Collectors.toList());
155+
}
156+
return endpoints.values().stream().limit(pageSize).collect(Collectors.toList());
157+
}
158+
159+
@Override
160+
public void validateEndpointSpec(EndpointSpec spec) {
161+
if (spec.getName().isEmpty()) {
162+
throw Status.INVALID_ARGUMENT
163+
.withDescription("Nexus endpoint name cannot be empty")
164+
.asRuntimeException();
165+
}
166+
if (!ENDPOINT_NAME_REGEX.matcher(spec.getName()).matches()) {
167+
throw Status.INVALID_ARGUMENT
168+
.withDescription(
169+
"Nexus endpoint name ("
170+
+ spec.getName()
171+
+ ") does not match expected pattern: "
172+
+ ENDPOINT_NAME_REGEX.pattern())
173+
.asRuntimeException();
174+
}
175+
if (!spec.hasTarget()) {
176+
throw Status.INVALID_ARGUMENT
177+
.withDescription("Nexus endpoint spec must have a target")
178+
.asRuntimeException();
179+
}
180+
if (!spec.getTarget().hasWorker()) {
181+
throw Status.INVALID_ARGUMENT
182+
.withDescription("Test server only supports Nexus endpoints with worker targets")
183+
.asRuntimeException();
184+
}
185+
}
186+
187+
@Override
188+
public void close() {}
189+
}

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestOperatorService.java

Lines changed: 85 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,15 @@
2020

2121
package io.temporal.internal.testservice;
2222

23+
import com.google.protobuf.ByteString;
2324
import io.grpc.Status;
2425
import io.grpc.StatusRuntimeException;
2526
import io.grpc.stub.StreamObserver;
2627
import io.temporal.api.enums.v1.IndexedValueType;
28+
import io.temporal.api.nexus.v1.Endpoint;
2729
import io.temporal.api.operatorservice.v1.*;
2830
import java.io.Closeable;
31+
import java.util.List;
2932
import java.util.Map;
3033
import org.slf4j.Logger;
3134
import org.slf4j.LoggerFactory;
@@ -40,9 +43,12 @@ final class TestOperatorService extends OperatorServiceGrpc.OperatorServiceImplB
4043
private static final Logger log = LoggerFactory.getLogger(TestOperatorService.class);
4144

4245
private final TestVisibilityStore visibilityStore;
46+
private final TestNexusEndpointStore nexusEndpointStore;
4347

44-
public TestOperatorService(TestVisibilityStore visibilityStore) {
48+
public TestOperatorService(
49+
TestVisibilityStore visibilityStore, TestNexusEndpointStore nexusEndpointStore) {
4550
this.visibilityStore = visibilityStore;
51+
this.nexusEndpointStore = nexusEndpointStore;
4652
}
4753

4854
@Override
@@ -93,6 +99,84 @@ public void removeSearchAttributes(
9399
}
94100
}
95101

102+
@Override
103+
public void getNexusEndpoint(
104+
GetNexusEndpointRequest request, StreamObserver<GetNexusEndpointResponse> responseObserver) {
105+
try {
106+
Endpoint endpoint = nexusEndpointStore.getEndpoint(request.getId());
107+
responseObserver.onNext(GetNexusEndpointResponse.newBuilder().setEndpoint(endpoint).build());
108+
responseObserver.onCompleted();
109+
} catch (StatusRuntimeException e) {
110+
handleStatusRuntimeException(e, responseObserver);
111+
}
112+
}
113+
114+
@Override
115+
public void createNexusEndpoint(
116+
CreateNexusEndpointRequest request,
117+
StreamObserver<CreateNexusEndpointResponse> responseObserver) {
118+
try {
119+
Endpoint created = nexusEndpointStore.createEndpoint(request.getSpec());
120+
responseObserver.onNext(
121+
CreateNexusEndpointResponse.newBuilder().setEndpoint(created).build());
122+
responseObserver.onCompleted();
123+
} catch (StatusRuntimeException e) {
124+
handleStatusRuntimeException(e, responseObserver);
125+
}
126+
}
127+
128+
@Override
129+
public void updateNexusEndpoint(
130+
UpdateNexusEndpointRequest request,
131+
StreamObserver<UpdateNexusEndpointResponse> responseObserver) {
132+
try {
133+
Endpoint updated =
134+
nexusEndpointStore.updateEndpoint(
135+
request.getId(), request.getVersion(), request.getSpec());
136+
responseObserver.onNext(
137+
UpdateNexusEndpointResponse.newBuilder().setEndpoint(updated).build());
138+
responseObserver.onCompleted();
139+
} catch (StatusRuntimeException e) {
140+
handleStatusRuntimeException(e, responseObserver);
141+
}
142+
}
143+
144+
@Override
145+
public void deleteNexusEndpoint(
146+
DeleteNexusEndpointRequest request,
147+
StreamObserver<DeleteNexusEndpointResponse> responseObserver) {
148+
try {
149+
nexusEndpointStore.deleteEndpoint(request.getId(), request.getVersion());
150+
responseObserver.onNext(DeleteNexusEndpointResponse.newBuilder().build());
151+
responseObserver.onCompleted();
152+
} catch (StatusRuntimeException e) {
153+
handleStatusRuntimeException(e, responseObserver);
154+
}
155+
}
156+
157+
@Override
158+
public void listNexusEndpoints(
159+
ListNexusEndpointsRequest request,
160+
StreamObserver<ListNexusEndpointsResponse> responseObserver) {
161+
try {
162+
List<Endpoint> endpoints =
163+
nexusEndpointStore.listEndpoints(
164+
request.getPageSize(), request.getNextPageToken().toByteArray(), request.getName());
165+
ByteString nextPageToken =
166+
(!endpoints.isEmpty() && endpoints.size() == request.getPageSize())
167+
? endpoints.get(endpoints.size() - 1).getIdBytes()
168+
: ByteString.empty();
169+
responseObserver.onNext(
170+
ListNexusEndpointsResponse.newBuilder()
171+
.addAllEndpoints(endpoints)
172+
.setNextPageToken(nextPageToken)
173+
.build());
174+
responseObserver.onCompleted();
175+
} catch (StatusRuntimeException e) {
176+
handleStatusRuntimeException(e, responseObserver);
177+
}
178+
}
179+
96180
private void handleStatusRuntimeException(
97181
StatusRuntimeException e, StreamObserver<?> responseObserver) {
98182
if (e.getStatus().getCode() == Status.Code.INTERNAL) {

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestServicesStarter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
public class TestServicesStarter implements Closeable {
3131
private final SelfAdvancingTimerImpl selfAdvancingTimer;
3232
private final TestVisibilityStore visibilityStore = new TestVisibilityStoreImpl();
33+
private final TestNexusEndpointStore nexusEndpointStore = new TestNexusEndpointStoreImpl();
3334
private final TestWorkflowStore workflowStore;
3435
private final TestOperatorService operatorService;
3536
private final TestWorkflowService workflowService;
@@ -46,7 +47,7 @@ public TestServicesStarter(boolean lockTimeSkipping, long initialTimeMillis) {
4647
this.selfAdvancingTimer =
4748
new SelfAdvancingTimerImpl(initialTimeMillis, Clock.systemDefaultZone());
4849
this.workflowStore = new TestWorkflowStoreImpl(this.selfAdvancingTimer);
49-
this.operatorService = new TestOperatorService(this.visibilityStore);
50+
this.operatorService = new TestOperatorService(this.visibilityStore, this.nexusEndpointStore);
5051
this.testService =
5152
new TestService(this.workflowStore, this.selfAdvancingTimer, lockTimeSkipping);
5253
this.workflowService =

0 commit comments

Comments
 (0)