Skip to content

Commit 2b4e377

Browse files
support DefaultValueTransform
1 parent 3c79348 commit 2b4e377

File tree

6 files changed

+340
-0
lines changed

6 files changed

+340
-0
lines changed
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.predicate;
20+
21+
import org.apache.paimon.data.InternalRow;
22+
import org.apache.paimon.types.DataType;
23+
import org.apache.paimon.utils.DefaultValueUtils;
24+
25+
import java.util.Collections;
26+
import java.util.List;
27+
import java.util.Objects;
28+
29+
import static org.apache.paimon.utils.Preconditions.checkArgument;
30+
31+
/**
32+
* A {@link Transform} which always returns the default value of the input field's {@link DataType}.
33+
*/
34+
public class DefaultValueTransform implements Transform {
35+
36+
private static final long serialVersionUID = 1L;
37+
38+
private final FieldRef fieldRef;
39+
40+
public DefaultValueTransform(FieldRef fieldRef) {
41+
this.fieldRef = Objects.requireNonNull(fieldRef, "fieldRef must not be null");
42+
}
43+
44+
public FieldRef fieldRef() {
45+
return fieldRef;
46+
}
47+
48+
@Override
49+
public List<Object> inputs() {
50+
return Collections.singletonList(fieldRef);
51+
}
52+
53+
@Override
54+
public DataType outputType() {
55+
return fieldRef.type();
56+
}
57+
58+
@Override
59+
public Object transform(InternalRow row) {
60+
return DefaultValueUtils.defaultValue(fieldRef.type());
61+
}
62+
63+
@Override
64+
public Transform copyWithNewInputs(List<Object> inputs) {
65+
List<Object> nonNullInputs =
66+
Objects.requireNonNull(inputs, "DefaultValueTransform expects non-null inputs");
67+
checkArgument(nonNullInputs.size() == 1, "DefaultValueTransform expects 1 input");
68+
checkArgument(
69+
nonNullInputs.get(0) instanceof FieldRef,
70+
"DefaultValueTransform input must be FieldRef");
71+
return new DefaultValueTransform((FieldRef) nonNullInputs.get(0));
72+
}
73+
74+
@Override
75+
public boolean equals(Object o) {
76+
if (o == null || getClass() != o.getClass()) {
77+
return false;
78+
}
79+
DefaultValueTransform that = (DefaultValueTransform) o;
80+
return Objects.equals(fieldRef, that.fieldRef);
81+
}
82+
83+
@Override
84+
public int hashCode() {
85+
return Objects.hashCode(fieldRef);
86+
}
87+
88+
@Override
89+
public String toString() {
90+
return "DefaultValueTransform{" + "fieldRef=" + fieldRef + '}';
91+
}
92+
}

paimon-common/src/main/java/org/apache/paimon/utils/DefaultValueUtils.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,30 @@
2121
import org.apache.paimon.casting.CastExecutor;
2222
import org.apache.paimon.casting.CastExecutors;
2323
import org.apache.paimon.data.BinaryString;
24+
import org.apache.paimon.data.Blob;
25+
import org.apache.paimon.data.Decimal;
26+
import org.apache.paimon.data.GenericArray;
27+
import org.apache.paimon.data.GenericMap;
28+
import org.apache.paimon.data.GenericRow;
29+
import org.apache.paimon.data.Timestamp;
30+
import org.apache.paimon.data.variant.GenericVariant;
31+
import org.apache.paimon.data.variant.Variant;
32+
import org.apache.paimon.types.BinaryType;
2433
import org.apache.paimon.types.DataType;
34+
import org.apache.paimon.types.DecimalType;
35+
import org.apache.paimon.types.RowType;
2536
import org.apache.paimon.types.VarCharType;
2637

2738
import javax.annotation.Nullable;
2839

40+
import java.math.BigDecimal;
41+
import java.util.Collections;
42+
2943
/** Utils for default value. */
3044
public class DefaultValueUtils {
3145

46+
private static final Variant NULL_VARIANT = GenericVariant.fromJson("null");
47+
3248
public static Object convertDefaultValue(DataType dataType, String defaultValueStr) {
3349
@SuppressWarnings("unchecked")
3450
CastExecutor<Object, Object> resolve =
@@ -58,4 +74,58 @@ public static void validateDefaultValue(DataType dataType, @Nullable String defa
5874
"Unsupported default value `" + defaultValueStr + "` for type " + dataType, e);
5975
}
6076
}
77+
78+
/** Creates a default value object for the given {@link DataType}. */
79+
public static Object defaultValue(DataType dataType) {
80+
switch (dataType.getTypeRoot()) {
81+
case BOOLEAN:
82+
return false;
83+
case TINYINT:
84+
return (byte) 0;
85+
case SMALLINT:
86+
return (short) 0;
87+
case INTEGER:
88+
case DATE:
89+
case TIME_WITHOUT_TIME_ZONE:
90+
return 0;
91+
case BIGINT:
92+
return 0L;
93+
case FLOAT:
94+
return 0.0f;
95+
case DOUBLE:
96+
return 0.0d;
97+
case DECIMAL:
98+
DecimalType decimalType = (DecimalType) dataType;
99+
return Decimal.fromBigDecimal(
100+
BigDecimal.ZERO, decimalType.getPrecision(), decimalType.getScale());
101+
case CHAR:
102+
case VARCHAR:
103+
return BinaryString.fromString("");
104+
case BINARY:
105+
return new byte[((BinaryType) dataType).getLength()];
106+
case VARBINARY:
107+
return new byte[0];
108+
case TIMESTAMP_WITHOUT_TIME_ZONE:
109+
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
110+
return Timestamp.fromEpochMillis(0);
111+
case ARRAY:
112+
return new GenericArray(new Object[0]);
113+
case MAP:
114+
case MULTISET:
115+
return new GenericMap(Collections.emptyMap());
116+
case ROW:
117+
RowType rowType = (RowType) dataType;
118+
GenericRow row = new GenericRow(rowType.getFieldCount());
119+
for (int i = 0; i < rowType.getFieldCount(); i++) {
120+
row.setField(i, defaultValue(rowType.getTypeAt(i)));
121+
}
122+
return row;
123+
case VARIANT:
124+
return NULL_VARIANT;
125+
case BLOB:
126+
return Blob.fromData(new byte[0]);
127+
default:
128+
throw new UnsupportedOperationException("Unsupported type: " + dataType);
129+
}
130+
}
61131
}
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.predicate;
20+
21+
import org.apache.paimon.data.GenericRow;
22+
import org.apache.paimon.data.InternalMap;
23+
import org.apache.paimon.types.DataType;
24+
import org.apache.paimon.types.DataTypeRoot;
25+
import org.apache.paimon.types.DataTypes;
26+
import org.apache.paimon.utils.DefaultValueUtils;
27+
import org.apache.paimon.utils.InternalRowUtils;
28+
29+
import org.junit.jupiter.api.Test;
30+
import org.junit.jupiter.params.ParameterizedTest;
31+
import org.junit.jupiter.params.provider.MethodSource;
32+
33+
import java.util.Collections;
34+
import java.util.stream.Stream;
35+
36+
import static org.assertj.core.api.Assertions.assertThat;
37+
38+
class DefaultValueTransformTest {
39+
40+
@ParameterizedTest
41+
@MethodSource("allTypes")
42+
void testReturnDefaultValueForAllTypes(DataType type) {
43+
DefaultValueTransform transform = new DefaultValueTransform(new FieldRef(0, "f0", type));
44+
assertThat(transform.outputType()).isEqualTo(type);
45+
46+
Object expected = DefaultValueUtils.defaultValue(type);
47+
Object actual = transform.transform(GenericRow.of(123));
48+
if (type.getTypeRoot() == DataTypeRoot.MULTISET) {
49+
assertThat(actual).isInstanceOf(InternalMap.class);
50+
assertThat(((InternalMap) actual).size()).isEqualTo(0);
51+
} else {
52+
assertThat(InternalRowUtils.equals(actual, expected, type)).isTrue();
53+
}
54+
}
55+
56+
@Test
57+
void testCopyWithNewInputs() {
58+
FieldRef ref0 = new FieldRef(0, "f0", DataTypes.INT());
59+
FieldRef ref3 = new FieldRef(3, "f0", DataTypes.INT());
60+
61+
DefaultValueTransform transform = new DefaultValueTransform(ref0);
62+
Transform copied = transform.copyWithNewInputs(Collections.singletonList(ref3));
63+
64+
assertThat(copied).isEqualTo(new DefaultValueTransform(ref3));
65+
assertThat(copied.outputType()).isEqualTo(DataTypes.INT());
66+
assertThat(copied.transform(GenericRow.of((Object) null))).isEqualTo(0);
67+
}
68+
69+
private static Stream<DataType> allTypes() {
70+
return Stream.of(
71+
// numeric
72+
DataTypes.TINYINT(),
73+
DataTypes.SMALLINT(),
74+
DataTypes.INT(),
75+
DataTypes.BIGINT(),
76+
DataTypes.FLOAT(),
77+
DataTypes.DOUBLE(),
78+
DataTypes.DECIMAL(10, 2),
79+
80+
// boolean
81+
DataTypes.BOOLEAN(),
82+
83+
// string
84+
DataTypes.STRING(),
85+
DataTypes.CHAR(3),
86+
DataTypes.VARCHAR(20),
87+
88+
// binary
89+
DataTypes.BYTES(),
90+
DataTypes.BINARY(8),
91+
DataTypes.VARBINARY(12),
92+
93+
// datetime
94+
DataTypes.DATE(),
95+
DataTypes.TIME(),
96+
DataTypes.TIME(9),
97+
DataTypes.TIMESTAMP(),
98+
DataTypes.TIMESTAMP_MILLIS(),
99+
DataTypes.TIMESTAMP(9),
100+
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(),
101+
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(9),
102+
DataTypes.TIMESTAMP_LTZ_MILLIS(),
103+
104+
// complex
105+
DataTypes.ARRAY(DataTypes.INT()),
106+
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.TIMESTAMP())),
107+
DataTypes.MAP(DataTypes.VARCHAR(10), DataTypes.TIMESTAMP()),
108+
DataTypes.MULTISET(DataTypes.STRING()),
109+
DataTypes.ROW(
110+
DataTypes.FIELD(0, "a", DataTypes.INT()),
111+
DataTypes.FIELD(1, "b", DataTypes.STRING())),
112+
113+
// special
114+
DataTypes.VARIANT(),
115+
DataTypes.BLOB(),
116+
117+
// not-null variants (exercise nullability flag on type)
118+
DataTypes.INT().copy(false),
119+
DataTypes.STRING().copy(false),
120+
DataTypes.ARRAY(DataTypes.INT()).copy(false));
121+
}
122+
}

paimon-core/src/main/java/org/apache/paimon/utils/TransformJsonSerde.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.paimon.predicate.CastTransform;
2323
import org.apache.paimon.predicate.ConcatTransform;
2424
import org.apache.paimon.predicate.ConcatWsTransform;
25+
import org.apache.paimon.predicate.DefaultValueTransform;
2526
import org.apache.paimon.predicate.FieldRef;
2627
import org.apache.paimon.predicate.FieldTransform;
2728
import org.apache.paimon.predicate.HashMaskTransform;
@@ -59,6 +60,7 @@ public class TransformJsonSerde {
5960
private static final String TRANSFORM_TYPE_MASK = "mask";
6061
private static final String TRANSFORM_TYPE_HASH = "hash";
6162
private static final String TRANSFORM_TYPE_NULL = "null";
63+
private static final String TRANSFORM_TYPE_DEFAULT = "default";
6264
private static final String TRANSFORM_TYPE_LITERAL = "literal";
6365

6466
private static final String FIELD_INPUTS = "inputs";
@@ -160,6 +162,13 @@ public static ObjectNode toJsonNode(Transform transform) {
160162
node.set(FIELD_FIELD, fieldRefToJsonNode(nullTransform.fieldRef()));
161163
return node;
162164
}
165+
if (transform instanceof DefaultValueTransform) {
166+
DefaultValueTransform defaultValueTransform = (DefaultValueTransform) transform;
167+
ObjectNode node = MAPPER.createObjectNode();
168+
node.put(FIELD_TYPE, TRANSFORM_TYPE_DEFAULT);
169+
node.set(FIELD_FIELD, fieldRefToJsonNode(defaultValueTransform.fieldRef()));
170+
return node;
171+
}
163172

164173
throw new IllegalArgumentException(
165174
"Unsupported transform type: " + transform.getClass().getName());
@@ -218,6 +227,10 @@ public static Transform parseTransformNode(JsonNode node) {
218227
FieldRef fieldRef = parseFieldRef(required(node, FIELD_FIELD));
219228
return new NullTransform(fieldRef);
220229
}
230+
if (TRANSFORM_TYPE_DEFAULT.equals(type)) {
231+
FieldRef fieldRef = parseFieldRef(required(node, FIELD_FIELD));
232+
return new DefaultValueTransform(fieldRef);
233+
}
221234
throw new IllegalArgumentException("Unsupported transform type: " + type);
222235
}
223236

paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.paimon.options.Options;
3333
import org.apache.paimon.predicate.ConcatTransform;
3434
import org.apache.paimon.predicate.ConcatWsTransform;
35+
import org.apache.paimon.predicate.DefaultValueTransform;
3536
import org.apache.paimon.predicate.FieldRef;
3637
import org.apache.paimon.predicate.FieldTransform;
3738
import org.apache.paimon.predicate.GreaterThan;
@@ -486,6 +487,35 @@ void testColumnMaskingApplyOnRead() throws Exception {
486487
assertThat(isNull).containsExactly(true, true);
487488
assertThat(col2).containsExactly(1, 2);
488489
}
490+
491+
{
492+
// Mask columns as their type default values: col1 -> "", col2 -> 0
493+
Transform defaultValueMaskCol1 =
494+
new DefaultValueTransform(new FieldRef(0, "col1", DataTypes.STRING()));
495+
Transform defaultValueMaskCol2 =
496+
new DefaultValueTransform(new FieldRef(1, "col2", DataTypes.INT()));
497+
restCatalogServer.addTableColumnMasking(
498+
identifier,
499+
ImmutableMap.of("col1", defaultValueMaskCol1, "col2", defaultValueMaskCol2));
500+
501+
ReadBuilder readBuilder = table.newReadBuilder();
502+
List<Split> splits = readBuilder.newScan().plan().splits();
503+
TableRead read = readBuilder.newRead();
504+
RecordReader<InternalRow> reader = read.createReader(splits);
505+
506+
List<String> col1 = new ArrayList<>();
507+
List<Integer> col2 = new ArrayList<>();
508+
List<Boolean> col1IsNull = new ArrayList<>();
509+
reader.forEachRemaining(
510+
row -> {
511+
col1IsNull.add(row.isNullAt(0));
512+
col1.add(row.getString(0).toString());
513+
col2.add(row.getInt(1));
514+
});
515+
assertThat(col1IsNull).containsExactly(false, false);
516+
assertThat(col1).containsExactly("", "");
517+
assertThat(col2).containsExactly(0, 0);
518+
}
489519
}
490520

491521
@Test

0 commit comments

Comments
 (0)