Skip to content

Commit 3c9e819

Browse files
Add RawValue support (#2492)
Add RawValue support
1 parent 7460156 commit 3c9e819

File tree

9 files changed

+201
-3
lines changed

9 files changed

+201
-3
lines changed

temporal-sdk/src/main/java/io/temporal/common/converter/DataConverter.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@
6666
*
6767
* A {@link DataConverter} created on previous step may be bundled with {@link PayloadCodec}s using
6868
* {@link CodecDataConverter} or used directly if no custom {@link PayloadCodec}s are needed.
69+
*
70+
* <p>{@link DataConverter} is expected to pass the {@link RawValue} payload through without
71+
* conversion. Though it should still apply the {@link PayloadCodec} to the {@link RawValue}
72+
* payloads.
6973
*/
7074
public interface DataConverter {
7175

temporal-sdk/src/main/java/io/temporal/common/converter/DefaultDataConverter.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@
2727
/**
2828
* A {@link DataConverter} that delegates payload conversion to type specific {@link
2929
* PayloadConverter} instances, and delegates failure conversions to a {@link FailureConverter}.
30-
*
31-
* @author fateev
3230
*/
3331
public class DefaultDataConverter extends PayloadAndFailureDataConverter {
3432

temporal-sdk/src/main/java/io/temporal/common/converter/PayloadAndFailureDataConverter.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,12 @@ public PayloadAndFailureDataConverter(@Nonnull List<PayloadConverter> converters
6565

6666
@Override
6767
public <T> Optional<Payload> toPayload(T value) throws DataConverterException {
68+
// Raw values payload should be passed through without conversion
69+
if (value instanceof RawValue) {
70+
RawValue rv = (RawValue) value;
71+
return Optional.of(rv.getPayload());
72+
}
73+
6874
for (PayloadConverter converter : converters) {
6975
Optional<Payload> result =
7076
(serializationContext != null ? converter.withContext(serializationContext) : converter)
@@ -77,9 +83,14 @@ public <T> Optional<Payload> toPayload(T value) throws DataConverterException {
7783
"No PayloadConverter is registered with this DataConverter that accepts value:" + value);
7884
}
7985

86+
@SuppressWarnings("unchecked")
8087
@Override
8188
public <T> T fromPayload(Payload payload, Class<T> valueClass, Type valueType)
8289
throws DataConverterException {
90+
if (valueClass == RawValue.class) {
91+
return (T) new RawValue(payload);
92+
}
93+
8394
try {
8495
String encoding =
8596
payload.getMetadataOrThrow(EncodingKeys.METADATA_ENCODING_KEY).toString(UTF_8);

temporal-sdk/src/main/java/io/temporal/common/converter/PayloadConverter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@
3333
* Used by the framework to serialize/deserialize method parameters that need to be sent over the
3434
* wire.
3535
*
36-
* @author fateev
36+
* <p>{@link PayloadConverter} is expected to pass the {@link RawValue} payload through without
37+
* conversion. Though it should still apply the {@link io.temporal.payload.codec.PayloadCodec} to
38+
* the {@link RawValue} payloads.
3739
*/
3840
public interface PayloadConverter {
3941

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.common.converter;
22+
23+
import io.temporal.api.common.v1.Payload;
24+
import java.util.Objects;
25+
26+
/**
27+
* RawValue is a representation of an unconverted, raw payload.
28+
*
29+
* <p>This type can be used as a parameter or return type in workflows and activities to pass
30+
* through a raw payload. Encoding/decoding of the payload is still done by the system.
31+
*/
32+
public final class RawValue {
33+
private final Payload payload;
34+
35+
public RawValue(Payload payload) {
36+
this.payload = Objects.requireNonNull(payload);
37+
}
38+
39+
public Payload getPayload() {
40+
return payload;
41+
}
42+
43+
@Override
44+
public boolean equals(Object o) {
45+
if (this == o) return true;
46+
if (o == null || getClass() != o.getClass()) return false;
47+
RawValue rawValue = (RawValue) o;
48+
return Objects.equals(payload, rawValue.payload);
49+
}
50+
51+
@Override
52+
public int hashCode() {
53+
return Objects.hash(payload);
54+
}
55+
56+
@Override
57+
public String toString() {
58+
return "RawValue{" + "payload=" + payload + '}';
59+
}
60+
}

temporal-sdk/src/test/java/io/temporal/common/converter/CodecDataConverterTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,15 @@
2626

2727
import com.google.protobuf.ByteString;
2828
import io.temporal.api.common.v1.Payload;
29+
import io.temporal.api.common.v1.Payloads;
2930
import io.temporal.api.failure.v1.Failure;
3031
import io.temporal.failure.ApplicationFailure;
3132
import io.temporal.failure.TemporalFailure;
3233
import io.temporal.payload.codec.PayloadCodec;
3334
import io.temporal.payload.codec.PayloadCodecException;
3435
import java.util.Collections;
3536
import java.util.List;
37+
import java.util.Optional;
3638
import java.util.stream.Collectors;
3739
import javax.annotation.Nonnull;
3840
import org.junit.Before;
@@ -127,6 +129,16 @@ public void testDetailsAreEncoded() {
127129
assertArrayEquals(new int[] {1, 2, 3}, decodedDetailsPayloads.get(2, int[].class, int[].class));
128130
}
129131

132+
@Test
133+
public void testRawValuePassThrough() {
134+
Payload p = Payload.newBuilder().setData(ByteString.copyFromUtf8("test")).build();
135+
Optional<Payloads> data = dataConverter.toPayloads(new RawValue(p));
136+
// Assert that the payload is still encoded
137+
assertTrue(isEncoded(data.get().getPayloads(0)));
138+
RawValue converted = dataConverter.fromPayloads(0, data, RawValue.class, RawValue.class);
139+
assertEquals(p, converted.getPayload());
140+
}
141+
130142
static boolean isEncoded(Payload payload) {
131143
return payload.getData().startsWith(PrefixPayloadCodec.PREFIX);
132144
}

temporal-sdk/src/test/java/io/temporal/common/converter/ProtoPayloadConverterTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,31 @@ public void testProto() {
6666
assertEquals(execution, converted);
6767
}
6868

69+
@Test
70+
public void testRawValue() {
71+
DataConverter converter = DefaultDataConverter.STANDARD_INSTANCE;
72+
ProtobufPayloadConverter protoConverter = new ProtobufPayloadConverter();
73+
WorkflowExecution execution =
74+
WorkflowExecution.newBuilder()
75+
.setWorkflowId(UUID.randomUUID().toString())
76+
.setRunId(UUID.randomUUID().toString())
77+
.build();
78+
Optional<Payloads> data =
79+
converter.toPayloads(new RawValue(protoConverter.toData(execution).get()));
80+
WorkflowExecution converted =
81+
converter.fromPayloads(0, data, WorkflowExecution.class, WorkflowExecution.class);
82+
assertEquals(execution, converted);
83+
}
84+
85+
@Test
86+
public void testRawValuePassThrough() {
87+
DataConverter converter = DefaultDataConverter.STANDARD_INSTANCE;
88+
Payload p = Payload.newBuilder().setData(ByteString.copyFromUtf8("test")).build();
89+
Optional<Payloads> data = converter.toPayloads(new RawValue(p));
90+
RawValue converted = converter.fromPayloads(0, data, RawValue.class, RawValue.class);
91+
assertEquals(p, converted.getPayload());
92+
}
93+
6994
@Test
7095
public void testCustomProto() {
7196
DataConverter converter =
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.workflow.activityTests;
22+
23+
import static org.junit.Assert.*;
24+
25+
import com.google.protobuf.ByteString;
26+
import io.temporal.activity.ActivityOptions;
27+
import io.temporal.api.common.v1.Payload;
28+
import io.temporal.common.converter.RawValue;
29+
import io.temporal.testing.internal.SDKTestWorkflowRule;
30+
import io.temporal.workflow.Workflow;
31+
import io.temporal.workflow.WorkflowInterface;
32+
import io.temporal.workflow.WorkflowMethod;
33+
import io.temporal.workflow.shared.TestActivities.TestActivitiesImpl;
34+
import io.temporal.workflow.shared.TestActivities.VariousTestActivities;
35+
import java.time.Duration;
36+
import org.junit.Assert;
37+
import org.junit.Rule;
38+
import org.junit.Test;
39+
40+
public class TestRawValueActivity {
41+
42+
private final TestActivitiesImpl activitiesImpl = new TestActivitiesImpl();
43+
44+
@Rule
45+
public SDKTestWorkflowRule testWorkflowRule =
46+
SDKTestWorkflowRule.newBuilder()
47+
.setWorkflowTypes(RawValueTestWorkflowImpl.class)
48+
.setActivityImplementations(activitiesImpl)
49+
.build();
50+
51+
@Test
52+
public void testRawValueEndToEnd() {
53+
RawValueTestWorkflow workflowStub =
54+
testWorkflowRule.newWorkflowStubTimeoutOptions(RawValueTestWorkflow.class);
55+
// Intentionally don't set an encoding to test that the payload is passed through as is.
56+
Payload p = Payload.newBuilder().setData(ByteString.copyFromUtf8("test")).build();
57+
RawValue input = new RawValue(p);
58+
RawValue result = workflowStub.execute(input);
59+
Assert.assertEquals(input.getPayload(), result.getPayload());
60+
}
61+
62+
@WorkflowInterface
63+
public interface RawValueTestWorkflow {
64+
@WorkflowMethod
65+
RawValue execute(RawValue value);
66+
}
67+
68+
public static class RawValueTestWorkflowImpl implements RawValueTestWorkflow {
69+
@Override
70+
public RawValue execute(RawValue value) {
71+
ActivityOptions options =
72+
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(5)).build();
73+
VariousTestActivities activities =
74+
Workflow.newActivityStub(VariousTestActivities.class, options);
75+
return activities.rawValueActivity(value);
76+
}
77+
}
78+
}

temporal-sdk/src/test/java/io/temporal/workflow/shared/TestActivities.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import io.temporal.client.ActivityCompletionClient;
3636
import io.temporal.client.ActivityNotExistsException;
3737
import io.temporal.common.MethodRetry;
38+
import io.temporal.common.converter.RawValue;
3839
import io.temporal.failure.ApplicationFailure;
3940
import io.temporal.testing.internal.SDKTestWorkflowRule;
4041
import java.io.Closeable;
@@ -145,6 +146,8 @@ public interface VariousTestActivities {
145146
void throwIOAnnotated();
146147

147148
List<UUID> activityUUIDList(List<UUID> arg);
149+
150+
RawValue rawValueActivity(RawValue arg);
148151
}
149152

150153
@ActivityInterface
@@ -411,6 +414,11 @@ public List<UUID> activityUUIDList(List<UUID> arg) {
411414
return arg;
412415
}
413416

417+
@Override
418+
public RawValue rawValueActivity(RawValue arg) {
419+
return arg;
420+
}
421+
414422
public int getLastAttempt() {
415423
return lastAttempt;
416424
}

0 commit comments

Comments
 (0)