Skip to content

Commit abc5323

Browse files
authored
Implement test server support for sync Nexus operation commands (#2176)
* Implement test server support for sync Nexus operations * Nexus operations command implementations * test cleanup * cleanup * tests
1 parent a5d6e60 commit abc5323

14 files changed

+1516
-97
lines changed

temporal-test-server/src/main/java/io/temporal/internal/testservice/CommandVerifier.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,12 @@
2828

2929
class CommandVerifier {
3030
private final TestVisibilityStore visibilityStore;
31+
private final TestNexusEndpointStore nexusEndpointStore;
3132

32-
public CommandVerifier(TestVisibilityStore visibilityStore) {
33+
public CommandVerifier(
34+
TestVisibilityStore visibilityStore, TestNexusEndpointStore nexusEndpointStore) {
3335
this.visibilityStore = visibilityStore;
36+
this.nexusEndpointStore = nexusEndpointStore;
3437
}
3538

3639
InvalidCommandResult verifyCommand(RequestContext ctx, Command d) {
@@ -52,6 +55,27 @@ InvalidCommandResult verifyCommand(RequestContext ctx, Command d) {
5255
eventAttributesFailure,
5356
e);
5457
}
58+
break;
59+
case COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION:
60+
try {
61+
nexusEndpointStore.getEndpointByName(
62+
d.getScheduleNexusOperationCommandAttributes().getEndpoint());
63+
} catch (StatusRuntimeException e) {
64+
ServerFailure eventAttributesFailure =
65+
new ServerFailure(
66+
ProtoEnumNameUtils.uniqueToSimplifiedName(
67+
WorkflowTaskFailedCause
68+
.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES)
69+
+ ": "
70+
+ e.getStatus().getDescription(),
71+
true);
72+
return new InvalidCommandResult(
73+
WorkflowTaskFailedCause
74+
.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
75+
eventAttributesFailure,
76+
e);
77+
}
78+
break;
5579
}
5680
return null;
5781
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.internal.testservice;
22+
23+
import com.google.protobuf.ByteString;
24+
import io.grpc.Status;
25+
import io.temporal.api.common.v1.WorkflowExecution;
26+
import java.io.*;
27+
import java.util.Objects;
28+
import javax.annotation.Nonnull;
29+
30+
class NexusTaskToken {
31+
32+
@Nonnull private final ExecutionId executionId;
33+
private final long scheduledEventId;
34+
private final int attempt;
35+
36+
NexusTaskToken(
37+
@Nonnull String namespace,
38+
@Nonnull WorkflowExecution execution,
39+
long scheduledEventId,
40+
int attempt) {
41+
this(
42+
new ExecutionId(Objects.requireNonNull(namespace), Objects.requireNonNull(execution)),
43+
scheduledEventId,
44+
attempt);
45+
}
46+
47+
NexusTaskToken(
48+
@Nonnull String namespace,
49+
@Nonnull String workflowId,
50+
@Nonnull String runId,
51+
long scheduledEventId,
52+
int attempt) {
53+
this(
54+
namespace,
55+
WorkflowExecution.newBuilder()
56+
.setWorkflowId(Objects.requireNonNull(workflowId))
57+
.setRunId(Objects.requireNonNull(runId))
58+
.build(),
59+
scheduledEventId,
60+
attempt);
61+
}
62+
63+
NexusTaskToken(@Nonnull ExecutionId executionId, long scheduledEventId, int attempt) {
64+
this.executionId = Objects.requireNonNull(executionId);
65+
this.scheduledEventId = scheduledEventId;
66+
this.attempt = attempt;
67+
}
68+
69+
public ExecutionId getExecutionId() {
70+
return executionId;
71+
}
72+
73+
public long getScheduledEventId() {
74+
return scheduledEventId;
75+
}
76+
77+
public long getAttempt() {
78+
return attempt;
79+
}
80+
81+
/** Used for task tokens. */
82+
public ByteString toBytes() {
83+
try (ByteArrayOutputStream bout = new ByteArrayOutputStream();
84+
DataOutputStream out = new DataOutputStream(bout)) {
85+
out.writeUTF(executionId.getNamespace());
86+
WorkflowExecution execution = executionId.getExecution();
87+
out.writeUTF(execution.getWorkflowId());
88+
out.writeUTF(execution.getRunId());
89+
out.writeLong(scheduledEventId);
90+
out.writeInt(attempt);
91+
return ByteString.copyFrom(bout.toByteArray());
92+
} catch (IOException e) {
93+
throw Status.INTERNAL.withCause(e).withDescription(e.getMessage()).asRuntimeException();
94+
}
95+
}
96+
97+
static NexusTaskToken fromBytes(ByteString serialized) {
98+
ByteArrayInputStream bin = new ByteArrayInputStream(serialized.toByteArray());
99+
DataInputStream in = new DataInputStream(bin);
100+
try {
101+
String namespace = in.readUTF();
102+
String workflowId = in.readUTF();
103+
String runId = in.readUTF();
104+
long scheduledEventId = in.readLong();
105+
int attempt = in.readInt();
106+
return new NexusTaskToken(namespace, workflowId, runId, scheduledEventId, attempt);
107+
} catch (IOException e) {
108+
throw Status.INVALID_ARGUMENT
109+
.withCause(e)
110+
.withDescription(e.getMessage())
111+
.asRuntimeException();
112+
}
113+
}
114+
}

temporal-test-server/src/main/java/io/temporal/internal/testservice/RequestContext.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ public int getChange() {
124124
// If an eager dispatch was performed, it should be reset to null
125125
private WorkflowTask workflowTaskForMatching;
126126
private final List<ActivityTask> activityTasks = new ArrayList<>();
127+
private final List<TestWorkflowStore.NexusTask> nexusTasks = new ArrayList<>();
127128
private final List<Timer> timers = new ArrayList<>();
128129
private long workflowCompletedAtEventId = -1;
129130
private boolean needWorkflowTask;
@@ -157,6 +158,7 @@ public int getChange() {
157158

158159
void add(RequestContext ctx) {
159160
this.activityTasks.addAll(ctx.getActivityTasks());
161+
this.nexusTasks.addAll(ctx.getNexusTasks());
160162
this.timers.addAll(ctx.getTimers());
161163
this.events.addAll(ctx.getEvents());
162164
}
@@ -252,6 +254,10 @@ void addActivityTask(ActivityTask activityTask) {
252254
this.activityTasks.add(activityTask);
253255
}
254256

257+
void addNexusTask(TestWorkflowStore.NexusTask nexusTask) {
258+
this.nexusTasks.add(nexusTask);
259+
}
260+
255261
/**
256262
* @return cancellation handle
257263
*/
@@ -269,6 +275,10 @@ List<ActivityTask> getActivityTasks() {
269275
return activityTasks;
270276
}
271277

278+
List<TestWorkflowStore.NexusTask> getNexusTasks() {
279+
return nexusTasks;
280+
}
281+
272282
List<HistoryEvent> getEvents() {
273283
return events;
274284
}

0 commit comments

Comments
 (0)