|
| 1 | +/* |
| 2 | + * Licensed to the Apache Software Foundation (ASF) under one |
| 3 | + * or more contributor license agreements. See the NOTICE file |
| 4 | + * distributed with this work for additional information |
| 5 | + * regarding copyright ownership. The ASF licenses this file |
| 6 | + * to you under the Apache License, Version 2.0 (the |
| 7 | + * "License"); you may not use this file except in compliance |
| 8 | + * with the License. You may obtain a copy of the License at |
| 9 | + * |
| 10 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | + * |
| 12 | + * Unless required by applicable law or agreed to in writing, software |
| 13 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 14 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 15 | + * See the License for the specific language governing permissions and |
| 16 | + * limitations under the License. |
| 17 | + */ |
| 18 | + |
| 19 | +package org.apache.paimon.table; |
| 20 | + |
| 21 | +import org.apache.paimon.CoreOptions; |
| 22 | +import org.apache.paimon.catalog.Identifier; |
| 23 | +import org.apache.paimon.data.BinaryString; |
| 24 | +import org.apache.paimon.data.DataFormatTestUtil; |
| 25 | +import org.apache.paimon.data.GenericRow; |
| 26 | +import org.apache.paimon.data.InternalRow; |
| 27 | +import org.apache.paimon.schema.Schema; |
| 28 | +import org.apache.paimon.table.sink.InnerTableCommit; |
| 29 | +import org.apache.paimon.table.sink.StreamTableWrite; |
| 30 | +import org.apache.paimon.table.source.Split; |
| 31 | +import org.apache.paimon.table.source.TableRead; |
| 32 | +import org.apache.paimon.types.DataTypes; |
| 33 | + |
| 34 | +import org.junit.jupiter.params.ParameterizedTest; |
| 35 | +import org.junit.jupiter.params.provider.Arguments; |
| 36 | +import org.junit.jupiter.params.provider.MethodSource; |
| 37 | + |
| 38 | +import java.util.ArrayList; |
| 39 | +import java.util.Arrays; |
| 40 | +import java.util.Collections; |
| 41 | +import java.util.List; |
| 42 | +import java.util.Map; |
| 43 | + |
| 44 | +import static org.apache.paimon.CoreOptions.BUCKET; |
| 45 | +import static org.apache.paimon.table.SimpleTableTestBase.getResult; |
| 46 | +import static org.assertj.core.api.Assertions.assertThat; |
| 47 | +import static org.junit.jupiter.params.provider.Arguments.arguments; |
| 48 | + |
| 49 | +/** Unit tests for overwrite table. */ |
| 50 | +public class OverwriteTableTest extends TableTestBase { |
| 51 | + |
| 52 | + @ParameterizedTest(name = "dynamic = {0}, partition={2}") |
| 53 | + @MethodSource("overwriteTestData") |
| 54 | + public void testOverwriteAppend( |
| 55 | + boolean dynamicPartitionOverwrite, |
| 56 | + List<InternalRow> overwriteData, |
| 57 | + Map<String, String> overwritePartition, |
| 58 | + List<String> expected) |
| 59 | + throws Exception { |
| 60 | + innerTestOverwrite( |
| 61 | + false, dynamicPartitionOverwrite, overwriteData, overwritePartition, expected); |
| 62 | + } |
| 63 | + |
| 64 | + @ParameterizedTest(name = "dynamic = {0}, partition={2}") |
| 65 | + @MethodSource("overwriteTestData") |
| 66 | + public void testOverwritePrimaryKey( |
| 67 | + boolean dynamicPartitionOverwrite, |
| 68 | + List<InternalRow> overwriteData, |
| 69 | + Map<String, String> overwritePartition, |
| 70 | + List<String> expected) |
| 71 | + throws Exception { |
| 72 | + innerTestOverwrite( |
| 73 | + true, dynamicPartitionOverwrite, overwriteData, overwritePartition, expected); |
| 74 | + } |
| 75 | + |
| 76 | + private void innerTestOverwrite( |
| 77 | + boolean withPrimaryKey, |
| 78 | + boolean dynamicPartitionOverwrite, |
| 79 | + List<InternalRow> overwriteData, |
| 80 | + Map<String, String> overwritePartition, |
| 81 | + List<String> expected) |
| 82 | + throws Exception { |
| 83 | + Identifier identifier = identifier("T"); |
| 84 | + Schema.Builder builder = |
| 85 | + Schema.newBuilder() |
| 86 | + .column("pk", DataTypes.INT()) |
| 87 | + .column("pt0", DataTypes.INT()) |
| 88 | + .column("pt1", DataTypes.STRING()) |
| 89 | + .column("v", DataTypes.STRING()) |
| 90 | + .partitionKeys("pt0", "pt1"); |
| 91 | + if (withPrimaryKey) { |
| 92 | + builder = builder.primaryKey("pk", "pt0", "pt1"); |
| 93 | + builder.option(BUCKET.key(), "1"); |
| 94 | + } |
| 95 | + catalog.createTable(identifier, builder.build(), true); |
| 96 | + Table originTable = catalog.getTable(identifier("T")); |
| 97 | + FileStoreTable table = (FileStoreTable) originTable; |
| 98 | + if (!dynamicPartitionOverwrite) { |
| 99 | + table = |
| 100 | + table.copy( |
| 101 | + Collections.singletonMap( |
| 102 | + CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key(), "false")); |
| 103 | + } |
| 104 | + |
| 105 | + // prepare data |
| 106 | + // (1, 1, 'A', 'Hi'), (2, 1, 'A', 'Hello'), (3, 1, 'A', 'World'), |
| 107 | + // (4, 1, 'B', 'To'), (5, 1, 'B', 'Apache'), (6, 1, 'B', 'Paimon') |
| 108 | + // (7, 2, 'A', 'Test') |
| 109 | + // (8, 2, 'B', 'Case') |
| 110 | + try (StreamTableWrite write = table.newWrite(commitUser); |
| 111 | + InnerTableCommit commit = table.newCommit(commitUser)) { |
| 112 | + write.write(overwriteRow(1, 1, "A", "Hi")); |
| 113 | + write.write(overwriteRow(2, 1, "A", "Hello")); |
| 114 | + write.write(overwriteRow(3, 1, "A", "World")); |
| 115 | + write.write(overwriteRow(4, 1, "B", "To")); |
| 116 | + write.write(overwriteRow(5, 1, "B", "Apache")); |
| 117 | + write.write(overwriteRow(6, 1, "B", "Paimon")); |
| 118 | + write.write(overwriteRow(7, 2, "A", "Test")); |
| 119 | + write.write(overwriteRow(8, 2, "B", "Case")); |
| 120 | + commit.commit(0, write.prepareCommit(true, 0)); |
| 121 | + } |
| 122 | + |
| 123 | + // overwrite data |
| 124 | + try (StreamTableWrite write = table.newWrite(commitUser).withIgnorePreviousFiles(true); |
| 125 | + InnerTableCommit commit = table.newCommit(commitUser)) { |
| 126 | + for (InternalRow row : overwriteData) { |
| 127 | + write.write(row); |
| 128 | + } |
| 129 | + commit.withOverwrite(overwritePartition).commit(1, write.prepareCommit(true, 1)); |
| 130 | + } |
| 131 | + |
| 132 | + // validate |
| 133 | + List<Split> splits = new ArrayList<>(table.newSnapshotReader().read().dataSplits()); |
| 134 | + TableRead read = table.newRead(); |
| 135 | + assertThat( |
| 136 | + getResult( |
| 137 | + read, |
| 138 | + splits, |
| 139 | + row -> |
| 140 | + DataFormatTestUtil.toStringNoRowKind( |
| 141 | + row, originTable.rowType()))) |
| 142 | + .hasSameElementsAs(expected); |
| 143 | + } |
| 144 | + |
| 145 | + private static List<Arguments> overwriteTestData() { |
| 146 | + // dynamic, overwrite data, overwrite partition, expected |
| 147 | + return Arrays.asList( |
| 148 | + // nothing happen |
| 149 | + arguments( |
| 150 | + true, |
| 151 | + Collections.emptyList(), |
| 152 | + Collections.emptyMap(), |
| 153 | + Arrays.asList( |
| 154 | + "1, 1, A, Hi", |
| 155 | + "2, 1, A, Hello", |
| 156 | + "3, 1, A, World", |
| 157 | + "4, 1, B, To", |
| 158 | + "5, 1, B, Apache", |
| 159 | + "6, 1, B, Paimon", |
| 160 | + "7, 2, A, Test", |
| 161 | + "8, 2, B, Case")), |
| 162 | + // delete all data |
| 163 | + arguments( |
| 164 | + false, |
| 165 | + Collections.emptyList(), |
| 166 | + Collections.emptyMap(), |
| 167 | + Collections.emptyList()), |
| 168 | + // specify one partition key |
| 169 | + arguments( |
| 170 | + true, |
| 171 | + Arrays.asList( |
| 172 | + overwriteRow(1, 1, "A", "Where"), overwriteRow(2, 1, "A", "When")), |
| 173 | + Collections.singletonMap("pt0", "1"), |
| 174 | + Arrays.asList( |
| 175 | + "1, 1, A, Where", |
| 176 | + "2, 1, A, When", |
| 177 | + "4, 1, B, To", |
| 178 | + "5, 1, B, Apache", |
| 179 | + "6, 1, B, Paimon", |
| 180 | + "7, 2, A, Test", |
| 181 | + "8, 2, B, Case")), |
| 182 | + arguments( |
| 183 | + false, |
| 184 | + Arrays.asList( |
| 185 | + overwriteRow(1, 1, "A", "Where"), overwriteRow(2, 1, "A", "When")), |
| 186 | + Collections.singletonMap("pt0", "1"), |
| 187 | + Arrays.asList( |
| 188 | + "1, 1, A, Where", |
| 189 | + "2, 1, A, When", |
| 190 | + "7, 2, A, Test", |
| 191 | + "8, 2, B, Case")), |
| 192 | + // all dynamic |
| 193 | + arguments( |
| 194 | + true, |
| 195 | + Arrays.asList( |
| 196 | + overwriteRow(4, 1, "B", "Where"), |
| 197 | + overwriteRow(5, 1, "B", "When"), |
| 198 | + overwriteRow(10, 2, "A", "Static"), |
| 199 | + overwriteRow(11, 2, "A", "Dynamic")), |
| 200 | + Collections.emptyMap(), |
| 201 | + Arrays.asList( |
| 202 | + "1, 1, A, Hi", |
| 203 | + "2, 1, A, Hello", |
| 204 | + "3, 1, A, World", |
| 205 | + "4, 1, B, Where", |
| 206 | + "5, 1, B, When", |
| 207 | + "10, 2, A, Static", |
| 208 | + "11, 2, A, Dynamic", |
| 209 | + "8, 2, B, Case")), |
| 210 | + arguments( |
| 211 | + false, |
| 212 | + Arrays.asList( |
| 213 | + overwriteRow(4, 1, "B", "Where"), |
| 214 | + overwriteRow(5, 1, "B", "When"), |
| 215 | + overwriteRow(10, 2, "A", "Static"), |
| 216 | + overwriteRow(11, 2, "A", "Dynamic")), |
| 217 | + Collections.emptyMap(), |
| 218 | + Arrays.asList( |
| 219 | + "4, 1, B, Where", |
| 220 | + "5, 1, B, When", |
| 221 | + "10, 2, A, Static", |
| 222 | + "11, 2, A, Dynamic"))); |
| 223 | + } |
| 224 | + |
| 225 | + private static InternalRow overwriteRow(Object... values) { |
| 226 | + return GenericRow.of( |
| 227 | + values[0], |
| 228 | + values[1], |
| 229 | + BinaryString.fromString((String) values[2]), |
| 230 | + BinaryString.fromString((String) values[3])); |
| 231 | + } |
| 232 | +} |
0 commit comments