Skip to content

Commit 9fd9fca

Browse files
authored
[java] Add Optional coder (#33019)
* [java] Add Optional coder * Add OptionalCoder to common types * NullableCoder based implementstion
1 parent fa3a283 commit 9fd9fca

File tree

3 files changed

+290
-0
lines changed

3 files changed

+290
-0
lines changed

sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.HashMap;
3333
import java.util.List;
3434
import java.util.Map;
35+
import java.util.Optional;
3536
import java.util.ServiceLoader;
3637
import java.util.Set;
3738
import org.apache.beam.sdk.annotations.Internal;
@@ -113,6 +114,8 @@ private CommonTypes() {
113114
Instant.class, CoderProviders.fromStaticMethods(Instant.class, InstantCoder.class));
114115
builder.put(
115116
Integer.class, CoderProviders.fromStaticMethods(Integer.class, VarIntCoder.class));
117+
builder.put(
118+
Optional.class, CoderProviders.fromStaticMethods(Optional.class, OptionalCoder.class));
116119
builder.put(
117120
Iterable.class, CoderProviders.fromStaticMethods(Iterable.class, IterableCoder.class));
118121
builder.put(KV.class, CoderProviders.fromStaticMethods(KV.class, KvCoder.class));
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.coders;
19+
20+
import java.io.IOException;
21+
import java.io.InputStream;
22+
import java.io.OutputStream;
23+
import java.util.List;
24+
import java.util.Optional;
25+
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
26+
import org.apache.beam.sdk.values.TypeDescriptor;
27+
import org.apache.beam.sdk.values.TypeParameter;
28+
29+
/**
30+
* A {@link OptionalCoder} encodes optional values of type {@code T} using a nested {@code
31+
* Coder<T>}. {@link OptionalCoder} uses exactly 1 byte per entry to indicate whether the value is
32+
* empty, then adds the encoding of the inner coder for non-empty values.
33+
*
34+
* @param <T> the type of the values being transcoded
35+
*/
36+
public class OptionalCoder<T> extends StructuredCoder<Optional<T>> {
37+
public static <T> OptionalCoder<T> of(Coder<T> valueCoder) {
38+
return new OptionalCoder<>(valueCoder);
39+
}
40+
41+
/////////////////////////////////////////////////////////////////////////////
42+
43+
private final NullableCoder<T> nullableCoder;
44+
45+
private OptionalCoder(Coder<T> valueCoder) {
46+
this.nullableCoder = NullableCoder.of(valueCoder);
47+
}
48+
49+
/** Returns the inner {@link Coder} wrapped by this {@link OptionalCoder} instance. */
50+
public Coder<T> getValueCoder() {
51+
return nullableCoder.getValueCoder();
52+
}
53+
54+
@Override
55+
public void encode(Optional<T> value, OutputStream outStream) throws IOException, CoderException {
56+
nullableCoder.encode(value.orElse(null), outStream);
57+
}
58+
59+
@Override
60+
public Optional<T> decode(InputStream inStream) throws IOException, CoderException {
61+
return Optional.ofNullable(nullableCoder.decode(inStream));
62+
}
63+
64+
@Override
65+
public List<Coder<T>> getCoderArguments() {
66+
return nullableCoder.getCoderArguments();
67+
}
68+
69+
/**
70+
* {@code OptionalCoder} is deterministic if the nested {@code Coder} is.
71+
*
72+
* <p>{@inheritDoc}
73+
*/
74+
@Override
75+
public void verifyDeterministic() throws NonDeterministicException {
76+
nullableCoder.verifyDeterministic();
77+
}
78+
79+
/**
80+
* {@code OptionalCoder} is consistent with equals if the nested {@code Coder} is.
81+
*
82+
* <p>{@inheritDoc}
83+
*/
84+
@Override
85+
public boolean consistentWithEquals() {
86+
return nullableCoder.consistentWithEquals();
87+
}
88+
89+
@Override
90+
public Object structuralValue(Optional<T> value) {
91+
return nullableCoder.structuralValue(value.orElse(null));
92+
}
93+
94+
/**
95+
* Overridden to short-circuit the default {@code StructuredCoder} behavior of encoding and
96+
* counting the bytes. The size is known (1 byte) when {@code value} is {@code null}, otherwise
97+
* the size is 1 byte plus the size of nested {@code Coder}'s encoding of {@code value}.
98+
*
99+
* <p>{@inheritDoc}
100+
*/
101+
@Override
102+
public void registerByteSizeObserver(Optional<T> value, ElementByteSizeObserver observer)
103+
throws Exception {
104+
nullableCoder.registerByteSizeObserver(value.orElse(null), observer);
105+
}
106+
107+
/**
108+
* Overridden to short-circuit the default {@code StructuredCoder} behavior of encoding and
109+
* counting the bytes. The size is known (1 byte) when {@code value} is {@code null}, otherwise
110+
* the size is 1 byte plus the size of nested {@code Coder}'s encoding of {@code value}.
111+
*
112+
* <p>{@inheritDoc}
113+
*/
114+
@Override
115+
protected long getEncodedElementByteSize(Optional<T> value) throws Exception {
116+
return nullableCoder.getEncodedElementByteSize(value.orElse(null));
117+
}
118+
119+
/**
120+
* {@code OptionalCoder} is cheap if {@code valueCoder} is cheap.
121+
*
122+
* <p>{@inheritDoc}
123+
*/
124+
@Override
125+
public boolean isRegisterByteSizeObserverCheap(Optional<T> value) {
126+
return nullableCoder.isRegisterByteSizeObserverCheap(value.orElse(null));
127+
}
128+
129+
@Override
130+
public TypeDescriptor<Optional<T>> getEncodedTypeDescriptor() {
131+
return new TypeDescriptor<Optional<T>>() {}.where(
132+
new TypeParameter<T>() {}, getValueCoder().getEncodedTypeDescriptor());
133+
}
134+
}
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.coders;
19+
20+
import static org.hamcrest.MatcherAssert.assertThat;
21+
import static org.hamcrest.Matchers.equalTo;
22+
import static org.junit.Assert.assertEquals;
23+
import static org.junit.Assert.assertFalse;
24+
import static org.junit.Assert.assertTrue;
25+
26+
import java.io.ByteArrayInputStream;
27+
import java.io.InputStream;
28+
import java.util.Arrays;
29+
import java.util.List;
30+
import java.util.Optional;
31+
import org.apache.beam.sdk.testing.CoderProperties;
32+
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
33+
import org.apache.beam.sdk.values.TypeDescriptor;
34+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
35+
import org.junit.Rule;
36+
import org.junit.Test;
37+
import org.junit.rules.ExpectedException;
38+
import org.junit.runner.RunWith;
39+
import org.junit.runners.JUnit4;
40+
41+
/** Unit tests for {@link OptionalCoder}. */
42+
@RunWith(JUnit4.class)
43+
public class OptionalCoderTest {
44+
45+
private static final Coder<Optional<String>> TEST_CODER = OptionalCoder.of(StringUtf8Coder.of());
46+
47+
private static final List<Optional<String>> TEST_VALUES =
48+
Arrays.asList(
49+
Optional.of(""),
50+
Optional.of("a"),
51+
Optional.of("13"),
52+
Optional.of("hello"),
53+
Optional.empty(),
54+
Optional.of("a longer string with spaces and all that"),
55+
Optional.of("a string with a \n newline"),
56+
Optional.of("スタリング"));
57+
58+
@Test
59+
public void testDecodeEncodeContentsInSameOrder() throws Exception {
60+
for (Optional<String> value : TEST_VALUES) {
61+
CoderProperties.coderDecodeEncodeEqual(TEST_CODER, value);
62+
}
63+
}
64+
65+
@Test
66+
public void testCoderSerializable() throws Exception {
67+
CoderProperties.coderSerializable(TEST_CODER);
68+
}
69+
70+
@Test
71+
public void testCoderIsSerializableWithWellKnownCoderType() throws Exception {
72+
CoderProperties.coderSerializable(OptionalCoder.of(GlobalWindow.Coder.INSTANCE));
73+
}
74+
75+
/**
76+
* Generated data to check that the wire format has not changed. To regenerate, see {@code
77+
* PrintBase64Encodings}.
78+
*
79+
* @see PrintBase64Encodings
80+
*/
81+
private static final List<String> TEST_ENCODINGS =
82+
Arrays.asList(
83+
"AQA",
84+
"AQFh",
85+
"AQIxMw",
86+
"AQVoZWxsbw",
87+
"AA",
88+
"AShhIGxvbmdlciBzdHJpbmcgd2l0aCBzcGFjZXMgYW5kIGFsbCB0aGF0",
89+
"ARlhIHN0cmluZyB3aXRoIGEgCiBuZXdsaW5l",
90+
"AQ_jgrnjgr_jg6rjg7PjgrA");
91+
92+
@Test
93+
public void testWireFormatEncode() throws Exception {
94+
CoderProperties.coderEncodesBase64(TEST_CODER, TEST_VALUES, TEST_ENCODINGS);
95+
}
96+
97+
@Test
98+
public void testEncodedSize() throws Exception {
99+
OptionalCoder<Double> coder = OptionalCoder.of(DoubleCoder.of());
100+
assertEquals(1, coder.getEncodedElementByteSize(Optional.empty()));
101+
assertEquals(9, coder.getEncodedElementByteSize(Optional.of(5.0)));
102+
}
103+
104+
@Test
105+
public void testEncodedSizeNested() throws Exception {
106+
OptionalCoder<String> varLenCoder = OptionalCoder.of(StringUtf8Coder.of());
107+
assertEquals(1, varLenCoder.getEncodedElementByteSize(Optional.empty()));
108+
assertEquals(6, varLenCoder.getEncodedElementByteSize(Optional.of("spam")));
109+
}
110+
111+
@Test
112+
public void testObserverIsCheap() throws Exception {
113+
OptionalCoder<Double> coder = OptionalCoder.of(DoubleCoder.of());
114+
assertTrue(coder.isRegisterByteSizeObserverCheap(Optional.of(5.0)));
115+
}
116+
117+
@Test
118+
public void testObserverIsNotCheap() throws Exception {
119+
OptionalCoder<List<String>> coder = OptionalCoder.of(ListCoder.of(StringUtf8Coder.of()));
120+
assertFalse(coder.isRegisterByteSizeObserverCheap(Optional.of(ImmutableList.of("hi", "test"))));
121+
}
122+
123+
@Test
124+
public void testObserverIsAlwaysCheapForEmptyValues() throws Exception {
125+
OptionalCoder<List<String>> coder = OptionalCoder.of(ListCoder.of(StringUtf8Coder.of()));
126+
assertTrue(coder.isRegisterByteSizeObserverCheap(Optional.empty()));
127+
}
128+
129+
@Test
130+
public void testStructuralValueConsistentWithEquals() throws Exception {
131+
CoderProperties.structuralValueConsistentWithEquals(
132+
TEST_CODER, Optional.empty(), Optional.empty());
133+
}
134+
135+
@Rule public ExpectedException thrown = ExpectedException.none();
136+
137+
@Test
138+
public void testDecodingError() throws Exception {
139+
thrown.expect(CoderException.class);
140+
thrown.expectMessage(
141+
equalTo("NullableCoder expects either a byte valued 0 (null) or 1 (present), got 5"));
142+
143+
InputStream input = new ByteArrayInputStream(new byte[] {5});
144+
TEST_CODER.decode(input);
145+
}
146+
147+
@Test
148+
public void testEncodedTypeDescriptor() throws Exception {
149+
TypeDescriptor<Optional<String>> expectedTypeDescriptor =
150+
new TypeDescriptor<Optional<String>>() {};
151+
assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(expectedTypeDescriptor));
152+
}
153+
}

0 commit comments

Comments
 (0)