Skip to content

Commit 2163b8f

Browse files
authored
Test server support for async Nexus operations (#2198)
* Test server support for async Nexus operations * Refactor cancel states * tests and fixes * refactor retry logic * failure handling * tests * task timeout * tests * feedback * license * typo
1 parent a173dbe commit 2163b8f

File tree

10 files changed

+1597
-621
lines changed

10 files changed

+1597
-621
lines changed
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.internal.testservice;
22+
23+
import com.google.protobuf.ByteString;
24+
import io.grpc.Status;
25+
import io.temporal.api.common.v1.WorkflowExecution;
26+
import java.io.*;
27+
import java.util.Objects;
28+
import javax.annotation.Nonnull;
29+
30+
public class NexusOperationRef {
31+
32+
@Nonnull private final ExecutionId executionId;
33+
private final long scheduledEventId;
34+
35+
NexusOperationRef(
36+
@Nonnull String namespace, @Nonnull WorkflowExecution execution, long scheduledEventId) {
37+
this(
38+
new ExecutionId(Objects.requireNonNull(namespace), Objects.requireNonNull(execution)),
39+
scheduledEventId);
40+
}
41+
42+
NexusOperationRef(
43+
@Nonnull String namespace,
44+
@Nonnull String workflowId,
45+
@Nonnull String runId,
46+
long scheduledEventId) {
47+
this(
48+
namespace,
49+
WorkflowExecution.newBuilder()
50+
.setWorkflowId(Objects.requireNonNull(workflowId))
51+
.setRunId(Objects.requireNonNull(runId))
52+
.build(),
53+
scheduledEventId);
54+
}
55+
56+
public NexusOperationRef(@Nonnull ExecutionId executionId, long scheduledEventId) {
57+
this.executionId = Objects.requireNonNull(executionId);
58+
this.scheduledEventId = scheduledEventId;
59+
}
60+
61+
public ExecutionId getExecutionId() {
62+
return executionId;
63+
}
64+
65+
public long getScheduledEventId() {
66+
return scheduledEventId;
67+
}
68+
69+
public ByteString toBytes() {
70+
try (ByteArrayOutputStream bout = new ByteArrayOutputStream();
71+
DataOutputStream out = new DataOutputStream(bout)) {
72+
out.writeUTF(executionId.getNamespace());
73+
WorkflowExecution execution = executionId.getExecution();
74+
out.writeUTF(execution.getWorkflowId());
75+
out.writeUTF(execution.getRunId());
76+
out.writeLong(scheduledEventId);
77+
return ByteString.copyFrom(bout.toByteArray());
78+
} catch (IOException e) {
79+
throw Status.INTERNAL.withCause(e).withDescription(e.getMessage()).asRuntimeException();
80+
}
81+
}
82+
83+
public static NexusOperationRef fromBytes(ByteString serialized) {
84+
return fromBytes(serialized.toByteArray());
85+
}
86+
87+
public static NexusOperationRef fromBytes(byte[] serialized) {
88+
ByteArrayInputStream bin = new ByteArrayInputStream(serialized);
89+
DataInputStream in = new DataInputStream(bin);
90+
try {
91+
String namespace = in.readUTF();
92+
String workflowId = in.readUTF();
93+
String runId = in.readUTF();
94+
long scheduledEventId = in.readLong();
95+
return new NexusOperationRef(namespace, workflowId, runId, scheduledEventId);
96+
} catch (IOException e) {
97+
throw Status.INVALID_ARGUMENT
98+
.withCause(e)
99+
.withDescription(e.getMessage())
100+
.asRuntimeException();
101+
}
102+
}
103+
}

temporal-test-server/src/main/java/io/temporal/internal/testservice/NexusTaskToken.java

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,74 +27,88 @@
2727
import java.util.Objects;
2828
import javax.annotation.Nonnull;
2929

30-
class NexusTaskToken {
30+
public class NexusTaskToken {
3131

32-
@Nonnull private final ExecutionId executionId;
33-
private final long scheduledEventId;
32+
@Nonnull private final NexusOperationRef ref;
3433
private final int attempt;
34+
private final boolean isCancel;
3535

3636
NexusTaskToken(
3737
@Nonnull String namespace,
3838
@Nonnull WorkflowExecution execution,
3939
long scheduledEventId,
40-
int attempt) {
40+
int attempt,
41+
boolean isCancel) {
4142
this(
4243
new ExecutionId(Objects.requireNonNull(namespace), Objects.requireNonNull(execution)),
4344
scheduledEventId,
44-
attempt);
45+
attempt,
46+
isCancel);
4547
}
4648

4749
NexusTaskToken(
4850
@Nonnull String namespace,
4951
@Nonnull String workflowId,
5052
@Nonnull String runId,
5153
long scheduledEventId,
52-
int attempt) {
54+
int attempt,
55+
boolean isCancel) {
5356
this(
5457
namespace,
5558
WorkflowExecution.newBuilder()
5659
.setWorkflowId(Objects.requireNonNull(workflowId))
5760
.setRunId(Objects.requireNonNull(runId))
5861
.build(),
5962
scheduledEventId,
60-
attempt);
63+
attempt,
64+
isCancel);
6165
}
6266

63-
NexusTaskToken(@Nonnull ExecutionId executionId, long scheduledEventId, int attempt) {
64-
this.executionId = Objects.requireNonNull(executionId);
65-
this.scheduledEventId = scheduledEventId;
66-
this.attempt = attempt;
67+
NexusTaskToken(
68+
@Nonnull ExecutionId executionId, long scheduledEventId, int attempt, boolean isCancel) {
69+
this(
70+
new NexusOperationRef(Objects.requireNonNull(executionId), scheduledEventId),
71+
attempt,
72+
isCancel);
6773
}
6874

69-
public ExecutionId getExecutionId() {
70-
return executionId;
75+
public NexusTaskToken(@Nonnull NexusOperationRef ref, int attempt, boolean isCancel) {
76+
this.ref = Objects.requireNonNull(ref);
77+
this.attempt = attempt;
78+
this.isCancel = isCancel;
7179
}
7280

73-
public long getScheduledEventId() {
74-
return scheduledEventId;
81+
public NexusOperationRef getOperationRef() {
82+
return ref;
7583
}
7684

7785
public long getAttempt() {
7886
return attempt;
7987
}
8088

89+
public boolean isCancel() {
90+
return isCancel;
91+
}
92+
8193
/** Used for task tokens. */
8294
public ByteString toBytes() {
8395
try (ByteArrayOutputStream bout = new ByteArrayOutputStream();
8496
DataOutputStream out = new DataOutputStream(bout)) {
97+
ExecutionId executionId = ref.getExecutionId();
8598
out.writeUTF(executionId.getNamespace());
8699
WorkflowExecution execution = executionId.getExecution();
87100
out.writeUTF(execution.getWorkflowId());
88101
out.writeUTF(execution.getRunId());
89-
out.writeLong(scheduledEventId);
102+
out.writeLong(ref.getScheduledEventId());
90103
out.writeInt(attempt);
104+
out.writeBoolean(isCancel);
91105
return ByteString.copyFrom(bout.toByteArray());
92106
} catch (IOException e) {
93107
throw Status.INTERNAL.withCause(e).withDescription(e.getMessage()).asRuntimeException();
94108
}
95109
}
96110

97-
static NexusTaskToken fromBytes(ByteString serialized) {
111+
public static NexusTaskToken fromBytes(ByteString serialized) {
98112
ByteArrayInputStream bin = new ByteArrayInputStream(serialized.toByteArray());
99113
DataInputStream in = new DataInputStream(bin);
100114
try {
@@ -103,7 +117,8 @@ static NexusTaskToken fromBytes(ByteString serialized) {
103117
String runId = in.readUTF();
104118
long scheduledEventId = in.readLong();
105119
int attempt = in.readInt();
106-
return new NexusTaskToken(namespace, workflowId, runId, scheduledEventId, attempt);
120+
boolean isCancel = in.readBoolean();
121+
return new NexusTaskToken(namespace, workflowId, runId, scheduledEventId, attempt, isCancel);
107122
} catch (IOException e) {
108123
throw Status.INVALID_ARGUMENT
109124
.withCause(e)

0 commit comments

Comments
 (0)