Skip to content

Commit 8c41658

Browse files
authored
[flink] Fix that action/procedure cannot remove unexisting files from manifests when dv enabled. (#6854)
1 parent 4fe9e1f commit 8c41658

File tree

5 files changed

+31
-15
lines changed

5 files changed

+31
-15
lines changed

paimon-core/src/main/java/org/apache/paimon/operation/ListUnexistingFiles.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.paimon.table.source.Split;
2828
import org.apache.paimon.utils.FileOperationThreadPool;
2929
import org.apache.paimon.utils.FileStorePathFactory;
30+
import org.apache.paimon.utils.Filter;
3031
import org.apache.paimon.utils.ThreadPoolUtils;
3132

3233
import java.io.IOException;
@@ -58,6 +59,7 @@ public Map<Integer, Map<String, DataFileMeta>> list(BinaryRow partition) throws
5859
Map<Integer, Map<String, DataFileMeta>> result = new HashMap<>();
5960
List<Split> splits =
6061
table.newScan()
62+
.withLevelFilter(Filter.alwaysTrue())
6163
.withPartitionFilter(Collections.singletonList(partition))
6264
.plan()
6365
.splits();

paimon-core/src/test/java/org/apache/paimon/operation/ListUnexistingFilesTest.java

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@
3838

3939
import org.junit.jupiter.api.io.TempDir;
4040
import org.junit.jupiter.params.ParameterizedTest;
41-
import org.junit.jupiter.params.provider.ValueSource;
41+
import org.junit.jupiter.params.provider.Arguments;
42+
import org.junit.jupiter.params.provider.MethodSource;
4243

4344
import java.util.ArrayList;
4445
import java.util.Arrays;
@@ -50,23 +51,30 @@
5051
import java.util.concurrent.ThreadLocalRandom;
5152
import java.util.function.Function;
5253
import java.util.stream.Collectors;
54+
import java.util.stream.Stream;
5355

5456
import static org.assertj.core.api.Assertions.assertThat;
57+
import static org.junit.jupiter.params.provider.Arguments.arguments;
5558

5659
/** Tests for {@link ListUnexistingFiles}. */
5760
public class ListUnexistingFilesTest {
5861

5962
@TempDir java.nio.file.Path tempDir;
6063

64+
private static Stream<Arguments> params() {
65+
return Stream.of(
66+
arguments(-1, false), arguments(3, false), arguments(-1, true), arguments(3, true));
67+
}
68+
6169
@ParameterizedTest
62-
@ValueSource(ints = {-1, 3})
63-
public void testListFiles(int bucket) throws Exception {
70+
@MethodSource("params")
71+
public void testListFiles(int bucket, boolean dvEnabled) throws Exception {
6472
int numPartitions = 2;
6573
int numFiles = 10;
6674
int[] numDeletes = new int[numPartitions];
6775
FileStoreTable table =
6876
prepareRandomlyDeletedTable(
69-
tempDir.toString(), "mydb", "t", bucket, numFiles, numDeletes);
77+
tempDir.toString(), "mydb", "t", bucket, numFiles, numDeletes, dvEnabled);
7078

7179
Function<Integer, BinaryRow> binaryRow =
7280
i -> {
@@ -90,7 +98,8 @@ public static FileStoreTable prepareRandomlyDeletedTable(
9098
String tableName,
9199
int bucket,
92100
int numFiles,
93-
int[] numDeletes)
101+
int[] numDeletes,
102+
boolean dvEnabled)
94103
throws Exception {
95104
RowType rowType =
96105
RowType.of(
@@ -102,12 +111,16 @@ public static FileStoreTable prepareRandomlyDeletedTable(
102111
if (bucket > 0) {
103112
options.put(CoreOptions.BUCKET_KEY.key(), "id");
104113
}
114+
if (dvEnabled) {
115+
options.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true");
116+
}
105117
FileStoreTable table =
106118
createPaimonTable(
107119
warehouse,
108120
databaseName,
109121
tableName,
110122
rowType,
123+
dvEnabled ? Arrays.asList("pt", "id") : Collections.emptyList(),
111124
Collections.singletonList("pt"),
112125
options);
113126

@@ -124,7 +137,11 @@ public static FileStoreTable prepareRandomlyDeletedTable(
124137
int identifier = 0;
125138
for (int i = 0; i < numPartitions; i++) {
126139
for (int j = 0; j < numFiles; j++) {
127-
write.write(GenericRow.of(i, random.nextInt(), random.nextLong()));
140+
if (dvEnabled && bucket == -1) {
141+
write.write(GenericRow.of(i, random.nextInt(), random.nextLong()), 0);
142+
} else {
143+
write.write(GenericRow.of(i, random.nextInt(), random.nextLong()));
144+
}
128145
identifier++;
129146
commit.commit(identifier, write.prepareCommit(false, identifier));
130147
}
@@ -157,19 +174,15 @@ private static FileStoreTable createPaimonTable(
157174
String databaseName,
158175
String tableName,
159176
RowType rowType,
177+
List<String> primaryKeys,
160178
List<String> partitionKeys,
161179
Map<String, String> customOptions)
162180
throws Exception {
163181
LocalFileIO fileIO = LocalFileIO.create();
164182
Path path = new Path(warehouse);
165183

166184
Schema schema =
167-
new Schema(
168-
rowType.getFields(),
169-
partitionKeys,
170-
Collections.emptyList(),
171-
customOptions,
172-
"");
185+
new Schema(rowType.getFields(), partitionKeys, primaryKeys, customOptions, "");
173186

174187
try (FileSystemCatalog paimonCatalog = new FileSystemCatalog(fileIO, path)) {
175188
paimonCatalog.createDatabase(databaseName, true);

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveUnexistingFilesActionITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public void testAction(int bucket) throws Exception {
4141
int numFiles = 10;
4242
int[] numDeletes = new int[numPartitions];
4343
ListUnexistingFilesTest.prepareRandomlyDeletedTable(
44-
warehouse, "mydb", "t", bucket, numFiles, numDeletes);
44+
warehouse, "mydb", "t", bucket, numFiles, numDeletes, false);
4545

4646
RemoveUnexistingFilesAction action =
4747
createAction(

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RemoveUnexistingFilesProcedureITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public void testProcedure(int bucket) throws Exception {
4343
int numFiles = 10;
4444
int[] numDeletes = new int[numPartitions];
4545
ListUnexistingFilesTest.prepareRandomlyDeletedTable(
46-
warehouse, "mydb", "t", bucket, numFiles, numDeletes);
46+
warehouse, "mydb", "t", bucket, numFiles, numDeletes, false);
4747

4848
TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
4949
tEnv.executeSql(

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveUnexistingFilesProcedureTest.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ class RemoveUnexistingFilesProcedureTest extends PaimonSparkTestBase {
4646
tableName,
4747
bucket,
4848
numFiles,
49-
numDeletes)
49+
numDeletes,
50+
false)
5051

5152
val actual = new Array[Int](numPartitions)
5253
val pattern = "pt=(\\d+?)/".r

0 commit comments

Comments
 (0)