Skip to content

Commit 776cb89

Browse files
Issue 489 parition key more types apache (#1264)
* [feature] partition key support more data types (#489) * [fix] update copyright year * [fix] partition tests for BinaryString keys * [fix] remove unnecessary test * update based on copilot suggesion. * add testPartitionedTable test * [server] whitespace issues * [feature] partition key support more data types (#489) * [fix] partition tests for BinaryString keys * [docs] change the docs on partition definition * [fix] delete empty files --------- Co-authored-by: ipolyzos <[email protected]>
1 parent 1f4ad5b commit 776cb89

File tree

9 files changed

+636
-54
lines changed

9 files changed

+636
-54
lines changed

fluss-client/src/main/java/org/apache/fluss/client/table/getter/PartitionGetter.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.fluss.row.InternalRow;
2222
import org.apache.fluss.types.DataType;
2323
import org.apache.fluss.types.RowType;
24+
import org.apache.fluss.utils.PartitionUtils;
2425

2526
import java.util.ArrayList;
2627
import java.util.List;
@@ -33,12 +34,14 @@ public class PartitionGetter {
3334

3435
private final List<String> partitionKeys;
3536
private final List<InternalRow.FieldGetter> partitionFieldGetters;
37+
private final List<DataType> partitionTypes;
3638

3739
public PartitionGetter(RowType rowType, List<String> partitionKeys) {
3840
// check the partition column
3941
List<String> fieldNames = rowType.getFieldNames();
4042
this.partitionKeys = partitionKeys;
4143
partitionFieldGetters = new ArrayList<>();
44+
partitionTypes = new ArrayList<>();
4245
for (String partitionKey : partitionKeys) {
4346
int partitionColumnIndex = fieldNames.indexOf(partitionKey);
4447
checkArgument(
@@ -49,17 +52,21 @@ public PartitionGetter(RowType rowType, List<String> partitionKeys) {
4952

5053
// check the data type of the partition column
5154
DataType partitionColumnDataType = rowType.getTypeAt(partitionColumnIndex);
55+
partitionTypes.add(partitionColumnDataType);
5256
partitionFieldGetters.add(
5357
InternalRow.createFieldGetter(partitionColumnDataType, partitionColumnIndex));
5458
}
5559
}
5660

5761
public String getPartition(InternalRow row) {
5862
List<String> partitionValues = new ArrayList<>();
59-
for (InternalRow.FieldGetter partitionFieldGetter : partitionFieldGetters) {
63+
for (int i = 0; i < partitionFieldGetters.size(); i++) {
64+
InternalRow.FieldGetter partitionFieldGetter = partitionFieldGetters.get(i);
65+
DataType dataType = partitionTypes.get(i);
6066
Object partitionValue = partitionFieldGetter.getFieldOrNull(row);
6167
checkNotNull(partitionValue, "Partition value shouldn't be null.");
62-
partitionValues.add(partitionValue.toString());
68+
partitionValues.add(
69+
PartitionUtils.convertValueOfType(partitionValue, dataType.getTypeRoot()));
6370
}
6471
ResolvedPartitionSpec resolvedPartitionSpec =
6572
new ResolvedPartitionSpec(partitionKeys, partitionValues);

fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -647,29 +647,6 @@ void testGetServerNodes() throws Exception {
647647
assertThat(serverNodes).containsExactlyInAnyOrderElementsOf(expectedNodes);
648648
}
649649

650-
@Test
651-
void testCreateIllegalPartitionTable() {
652-
String dbName = DEFAULT_TABLE_PATH.getDatabaseName();
653-
TableDescriptor partitionedTable =
654-
TableDescriptor.builder()
655-
.schema(
656-
Schema.newBuilder()
657-
.column("id", DataTypes.STRING())
658-
.column("name", DataTypes.STRING())
659-
.column("dt", DataTypes.DATE())
660-
.build())
661-
.distributedBy(3, "id")
662-
.partitionedBy("name", "dt")
663-
.build();
664-
TablePath tablePath = TablePath.of(dbName, "test_create_illegal_partitioned_table_1");
665-
assertThatThrownBy(() -> admin.createTable(tablePath, partitionedTable, true).get())
666-
.cause()
667-
.isInstanceOf(InvalidTableException.class)
668-
.hasMessageContaining(
669-
"Currently, partitioned table supported partition key type are [STRING], "
670-
+ "but got partition key 'dt' with data type DATE.");
671-
}
672-
673650
@Test
674651
void testAddAndDropPartitions() throws Exception {
675652
String dbName = DEFAULT_TABLE_PATH.getDatabaseName();
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.table;
19+
20+
import org.apache.fluss.client.admin.ClientToServerITCaseBase;
21+
import org.apache.fluss.client.table.writer.AppendWriter;
22+
import org.apache.fluss.metadata.PartitionInfo;
23+
import org.apache.fluss.metadata.Schema;
24+
import org.apache.fluss.metadata.TableDescriptor;
25+
import org.apache.fluss.metadata.TableInfo;
26+
import org.apache.fluss.metadata.TablePath;
27+
import org.apache.fluss.row.BinaryString;
28+
import org.apache.fluss.row.InternalRow;
29+
import org.apache.fluss.row.TimestampLtz;
30+
import org.apache.fluss.row.TimestampNtz;
31+
import org.apache.fluss.types.BigIntType;
32+
import org.apache.fluss.types.BinaryType;
33+
import org.apache.fluss.types.BooleanType;
34+
import org.apache.fluss.types.BytesType;
35+
import org.apache.fluss.types.CharType;
36+
import org.apache.fluss.types.DataTypeRoot;
37+
import org.apache.fluss.types.DataTypes;
38+
import org.apache.fluss.types.DateType;
39+
import org.apache.fluss.types.DoubleType;
40+
import org.apache.fluss.types.FloatType;
41+
import org.apache.fluss.types.IntType;
42+
import org.apache.fluss.types.LocalZonedTimestampType;
43+
import org.apache.fluss.types.SmallIntType;
44+
import org.apache.fluss.types.StringType;
45+
import org.apache.fluss.types.TimeType;
46+
import org.apache.fluss.types.TimestampType;
47+
import org.apache.fluss.types.TinyIntType;
48+
49+
import org.junit.jupiter.api.Test;
50+
51+
import java.util.ArrayList;
52+
import java.util.Arrays;
53+
import java.util.HashMap;
54+
import java.util.List;
55+
import java.util.Map;
56+
import java.util.stream.Collectors;
57+
58+
import static org.apache.fluss.testutils.DataTestUtils.row;
59+
import static org.apache.fluss.utils.PartitionUtils.convertValueOfType;
60+
import static org.assertj.core.api.Assertions.assertThat;
61+
62+
/** IT case for Fluss partitioned table supporting partition key of different types. */
63+
class NewPartitionedTableITCase extends ClientToServerITCaseBase {
64+
Schema.Builder schemaBuilder =
65+
Schema.newBuilder()
66+
.column("a", new StringType())
67+
.column("char", new CharType())
68+
.column("binary", new BinaryType())
69+
.column("boolean", new BooleanType())
70+
.column("bytes", new BytesType())
71+
.column("tinyInt", new TinyIntType())
72+
.column("smallInt", new SmallIntType())
73+
.column("int", new IntType())
74+
.column("bigInt", new BigIntType())
75+
.column("date", new DateType())
76+
.column("float", new FloatType())
77+
.column("double", new DoubleType())
78+
.column("time", new TimeType())
79+
.column("timeStampNTZ", new TimestampType())
80+
.column("timeStampLTZ", new LocalZonedTimestampType());
81+
82+
Schema schema = schemaBuilder.build();
83+
DataTypeRoot[] allPartitionKeyTypes =
84+
new DataTypeRoot[] {
85+
DataTypeRoot.STRING,
86+
DataTypeRoot.CHAR,
87+
DataTypeRoot.BINARY,
88+
DataTypeRoot.BOOLEAN,
89+
DataTypeRoot.BYTES,
90+
DataTypeRoot.TINYINT,
91+
DataTypeRoot.SMALLINT,
92+
DataTypeRoot.INTEGER,
93+
DataTypeRoot.BIGINT,
94+
DataTypeRoot.DATE,
95+
DataTypeRoot.FLOAT,
96+
DataTypeRoot.DOUBLE,
97+
DataTypeRoot.TIME_WITHOUT_TIME_ZONE,
98+
DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
99+
DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE
100+
};
101+
102+
Object[] allPartitionKeyValues =
103+
new Object[] {
104+
BinaryString.fromString("a"),
105+
BinaryString.fromString("F"),
106+
new byte[] {0x10, 0x20, 0x30, 0x40, 0x50, (byte) 0b11111111},
107+
true,
108+
new byte[] {0x10, 0x20, 0x30, 0x40, 0x50, (byte) 0b11111111},
109+
(byte) 100,
110+
(short) -32760, // smallint
111+
299000, // Integer
112+
1748662955428L, // Bigint
113+
20235, // Date
114+
5.73f, // Float
115+
5.73, // Double
116+
5402199, // Time
117+
TimestampNtz.fromMillis(1748662955428L), // TIME_WITHOUT_TIME_ZONE
118+
TimestampLtz.fromEpochMillis(1748662955428L) // TIMESTAMP_WITH_LOCAL_TIME_ZONE
119+
};
120+
121+
Schema.Column[] extraColumn =
122+
new Schema.Column[] {
123+
new Schema.Column("a", new StringType()),
124+
new Schema.Column("char", new CharType()),
125+
new Schema.Column("binary", new BinaryType()),
126+
new Schema.Column("boolean", new BooleanType()),
127+
new Schema.Column("bytes", new BytesType()),
128+
new Schema.Column("tinyInt", new TinyIntType()),
129+
new Schema.Column("smallInt", new SmallIntType()),
130+
new Schema.Column("int", new IntType()),
131+
new Schema.Column("bigInt", new BigIntType()),
132+
new Schema.Column("date", new DateType()),
133+
new Schema.Column("float", new FloatType()),
134+
new Schema.Column("double", new DoubleType()),
135+
new Schema.Column("time", new TimeType()),
136+
new Schema.Column("timeStampNTZ", new TimestampType()),
137+
new Schema.Column("timeStampLTZ", new LocalZonedTimestampType())
138+
};
139+
140+
List<String> result =
141+
Arrays.asList(
142+
"a",
143+
"F",
144+
"1020304050ff",
145+
"true",
146+
"1020304050ff",
147+
"100",
148+
"-32760",
149+
"299000",
150+
"1748662955428",
151+
"2025-05-27",
152+
"5_73",
153+
"5_73",
154+
"01-30-02_199",
155+
"2025-05-31-03-42-35_428",
156+
"2025-05-31-03-42-35_428");
157+
158+
@Test
159+
public void testPartitionedTable() throws Exception {
160+
TablePath tablePath = TablePath.of("fluss", "person");
161+
162+
TableDescriptor partitionedTable =
163+
TableDescriptor.builder()
164+
.schema(
165+
Schema.newBuilder()
166+
.column("id", DataTypes.INT())
167+
.column("name", DataTypes.STRING())
168+
.column("dt", DataTypes.DATE())
169+
.build())
170+
.distributedBy(3, "name")
171+
.partitionedBy("id", "dt")
172+
.build();
173+
admin.createTable(tablePath, partitionedTable, true).get();
174+
TableInfo tableInfo = admin.getTableInfo(tablePath).get();
175+
176+
assertThat(tableInfo).isNotNull();
177+
assertThat(tableInfo.getTablePath()).isEqualTo(tablePath);
178+
179+
List<String> partitionKeys = tableInfo.getPartitionKeys();
180+
assertThat(partitionKeys).hasSize(2);
181+
assertThat(partitionKeys).containsExactly("id", "dt");
182+
}
183+
184+
@Test
185+
public void testMultipleTypedPartitionedTable() throws Exception {
186+
187+
for (int i = 0; i < allPartitionKeyTypes.length; i++) {
188+
String partitionKey = extraColumn[i].getName();
189+
TablePath tablePath =
190+
TablePath.of("test_part_db_" + i, "test_static_partitioned_pk_table_" + i);
191+
createPartitionedTable(tablePath, partitionKey);
192+
String partitionValue =
193+
convertValueOfType(allPartitionKeyValues[i], allPartitionKeyTypes[i]);
194+
195+
admin.createPartition(tablePath, newPartitionSpec(partitionKey, partitionValue), true)
196+
.get();
197+
198+
Map<String, Long> partitionIdByNames =
199+
FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath, 1);
200+
201+
List<PartitionInfo> partitionInfos = admin.listPartitionInfos(tablePath).get();
202+
List<String> expectedPartitions = new ArrayList<>(partitionIdByNames.keySet());
203+
assertThat(
204+
partitionInfos.stream()
205+
.map(PartitionInfo::getPartitionName)
206+
.collect(Collectors.toList()))
207+
.containsExactlyInAnyOrderElementsOf(expectedPartitions);
208+
209+
Table table = conn.getTable(tablePath);
210+
AppendWriter appendWriter = table.newAppend().createWriter();
211+
Map<Long, List<InternalRow>> expectPartitionAppendRows = new HashMap<>();
212+
for (String partition : partitionIdByNames.keySet()) {
213+
for (int j = 0; j < allPartitionKeyValues.length; j++) {
214+
InternalRow row = row(allPartitionKeyValues);
215+
appendWriter.append(row);
216+
expectPartitionAppendRows
217+
.computeIfAbsent(
218+
partitionIdByNames.get(partition), k -> new ArrayList<>())
219+
.add(row);
220+
}
221+
}
222+
appendWriter.flush();
223+
224+
assertThat(admin.listPartitionInfos(tablePath).get().get(0).getPartitionName())
225+
.isEqualTo(result.get(i));
226+
}
227+
}
228+
229+
private void createPartitionedTable(TablePath tablePath, String partitionKey) throws Exception {
230+
TableDescriptor partitionTableDescriptor =
231+
TableDescriptor.builder().schema(schema).partitionedBy(partitionKey).build();
232+
createTable(tablePath, partitionTableDescriptor, false);
233+
}
234+
}

0 commit comments

Comments
 (0)