Skip to content

Commit 6c7f32c

Browse files
authored
[core] remove PrimaryKeyFileStoreTableTest.testStreamingChangelogCompatibility02 test method (#4170)
1 parent bb453c2 commit 6c7f32c

File tree

7 files changed

+24
-133
lines changed

7 files changed

+24
-133
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
</tr>
6565
<tr>
6666
<td><h5>changelog-file.prefix</h5></td>
67-
<td style="word-wrap: break-word;">(none)</td>
67+
<td style="word-wrap: break-word;">"changelog-"</td>
6868
<td>String</td>
6969
<td>Specify the file name prefix of changelog files.</td>
7070
</tr>
@@ -212,6 +212,12 @@
212212
<td>Duration</td>
213213
<td>The TTL in rocksdb index for cross partition upsert (primary keys not contain all partition fields), this can avoid maintaining too many indexes and lead to worse and worse performance, but please note that this may also cause data duplication.</td>
214214
</tr>
215+
<tr>
216+
<td><h5>data-file.prefix</h5></td>
217+
<td style="word-wrap: break-word;">"data-"</td>
218+
<td>String</td>
219+
<td>Specify the file name prefix of data files.</td>
220+
</tr>
215221
<tr>
216222
<td><h5>delete-file.thread-num</h5></td>
217223
<td style="word-wrap: break-word;">(none)</td>
@@ -326,12 +332,6 @@
326332
<td>Map</td>
327333
<td>Define different file format for different level, you can add the conf like this: 'file.format.per.level' = '0:avro,3:parquet', if the file format for level is not provided, the default format which set by `file.format` will be used.</td>
328334
</tr>
329-
<tr>
330-
<td><h5>data-file.prefix</h5></td>
331-
<td style="word-wrap: break-word;">(none)</td>
332-
<td>String</td>
333-
<td>Specify the file name prefix of data files.</td>
334-
</tr>
335335
<tr>
336336
<td><h5>force-lookup</h5></td>
337337
<td style="word-wrap: break-word;">false</td>

paimon-common/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,13 +174,13 @@ public class CoreOptions implements Serializable {
174174
public static final ConfigOption<String> DATA_FILE_PREFIX =
175175
key("data-file.prefix")
176176
.stringType()
177-
.noDefaultValue()
177+
.defaultValue("data-")
178178
.withDescription("Specify the file name prefix of data files.");
179179

180180
public static final ConfigOption<String> CHANGELOG_FILE_PREFIX =
181181
key("changelog-file.prefix")
182182
.stringType()
183-
.noDefaultValue()
183+
.defaultValue("changelog-")
184184
.withDescription("Specify the file name prefix of changelog files.");
185185

186186
public static final ConfigOption<MemorySize> FILE_BLOCK_SIZE =

paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import org.apache.paimon.annotation.VisibleForTesting;
2222
import org.apache.paimon.fs.Path;
23-
import org.apache.paimon.utils.StringUtils;
2423

2524
import javax.annotation.concurrent.ThreadSafe;
2625

@@ -31,10 +30,6 @@
3130
@ThreadSafe
3231
public class DataFilePathFactory {
3332

34-
public static final String DEFAULT_DATA_FILE_PREFIX = "data-";
35-
36-
public static final String DEFAULT_CHANGELOG_FILE_PREFIX = "changelog-";
37-
3833
public static final String INDEX_PATH_SUFFIX = ".index";
3934

4035
private final Path parent;
@@ -59,17 +54,11 @@ public DataFilePathFactory(
5954
}
6055

6156
public Path newPath() {
62-
if (!StringUtils.isNullOrWhitespaceOnly(dataFilePrefix)) {
63-
return newPath(dataFilePrefix);
64-
}
65-
return newPath(DEFAULT_DATA_FILE_PREFIX);
57+
return newPath(dataFilePrefix);
6658
}
6759

6860
public Path newChangelogPath() {
69-
if (!StringUtils.isNullOrWhitespaceOnly(changelogFilePrefix)) {
70-
return newPath(changelogFilePrefix);
71-
}
72-
return newPath(DEFAULT_CHANGELOG_FILE_PREFIX);
61+
return newPath(changelogFilePrefix);
7362
}
7463

7564
private Path newPath(String prefix) {

paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.apache.paimon.fs.Path;
2929
import org.apache.paimon.fs.local.LocalFileIO;
3030
import org.apache.paimon.io.DataFileMeta;
31-
import org.apache.paimon.io.DataFilePathFactory;
3231
import org.apache.paimon.io.FileReaderFactory;
3332
import org.apache.paimon.io.KeyValueFileReaderFactory;
3433
import org.apache.paimon.io.KeyValueFileWriterFactory;
@@ -109,13 +108,14 @@ public void beforeEach() throws Exception {
109108
public void testRewriteFailAndCleanupFiles(boolean rewriteChangelog) throws Exception {
110109
List<List<SortedRun>> sections = createTestSections(2);
111110
Path testPath = new Path(path, UUID.randomUUID().toString());
111+
CoreOptions coreOptions = new CoreOptions(new Options());
112112
try (ChangelogMergeTreeRewriter rewriter =
113113
new TestRewriter(
114114
createReaderFactory(schemaManager, tableSchema, keyType, valueType),
115115
createWriterFactory(testPath, keyType, valueType),
116116
comparator,
117117
new MergeSorter(
118-
new CoreOptions(new Options()),
118+
coreOptions,
119119
tableSchema.logicalPrimaryKeysType(),
120120
tableSchema.logicalRowType(),
121121
null),
@@ -136,13 +136,12 @@ public void testRewriteFailAndCleanupFiles(boolean rewriteChangelog) throws Exce
136136
p.getFileName()
137137
.toString()
138138
.startsWith(
139-
DataFilePathFactory
140-
.DEFAULT_DATA_FILE_PREFIX)
139+
coreOptions.dataFilePrefix())
141140
|| p.getFileName()
142141
.toString()
143142
.startsWith(
144-
DataFilePathFactory
145-
.DEFAULT_CHANGELOG_FILE_PREFIX))
143+
coreOptions
144+
.changelogFilePrefix()))
146145
.collect(Collectors.toList());
147146
Assertions.assertEquals(0, files.size());
148147
}
@@ -153,13 +152,14 @@ public void testRewriteFailAndCleanupFiles(boolean rewriteChangelog) throws Exce
153152
public void testRewriteSuccess(boolean rewriteChangelog) throws Exception {
154153
List<List<SortedRun>> sections = createTestSections(2);
155154
Path testPath = new Path(path, UUID.randomUUID().toString());
155+
CoreOptions coreOptions = new CoreOptions(new Options());
156156
try (ChangelogMergeTreeRewriter rewriter =
157157
new TestRewriter(
158158
createReaderFactory(schemaManager, tableSchema, keyType, valueType),
159159
createWriterFactory(testPath, keyType, valueType),
160160
comparator,
161161
new MergeSorter(
162-
new CoreOptions(new Options()),
162+
coreOptions,
163163
tableSchema.logicalPrimaryKeysType(),
164164
tableSchema.logicalRowType(),
165165
null),
@@ -175,13 +175,12 @@ public void testRewriteSuccess(boolean rewriteChangelog) throws Exception {
175175
p.getFileName()
176176
.toString()
177177
.startsWith(
178-
DataFilePathFactory
179-
.DEFAULT_DATA_FILE_PREFIX)
178+
coreOptions.dataFilePrefix())
180179
|| p.getFileName()
181180
.toString()
182181
.startsWith(
183-
DataFilePathFactory
184-
.DEFAULT_CHANGELOG_FILE_PREFIX))
182+
coreOptions
183+
.changelogFilePrefix()))
185184
.collect(Collectors.toList());
186185
if (rewriteChangelog) {
187186
Assertions.assertEquals(2, files.size()); // changelog + data file

paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,6 @@
8080
import java.util.function.Predicate;
8181
import java.util.stream.Collectors;
8282

83-
import static org.apache.paimon.io.DataFilePathFactory.DEFAULT_CHANGELOG_FILE_PREFIX;
84-
import static org.apache.paimon.io.DataFilePathFactory.DEFAULT_DATA_FILE_PREFIX;
8583
import static org.apache.paimon.utils.BranchManager.branchPath;
8684
import static org.apache.paimon.utils.FileStorePathFactory.BUCKET_PATH_PREFIX;
8785
import static org.assertj.core.api.Assertions.assertThat;
@@ -587,7 +585,9 @@ private int randomlyAddNonUsedDataFiles() throws IOException {
587585
path,
588586
1,
589587
Arrays.asList(
590-
DEFAULT_DATA_FILE_PREFIX, DEFAULT_CHANGELOG_FILE_PREFIX, "UNKNOWN-"));
588+
table.coreOptions().dataFilePrefix(),
589+
table.coreOptions().changelogFilePrefix(),
590+
"UNKNOWN-"));
591591
}
592592
addedFiles += corruptedBuckets.size();
593593

paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java

Lines changed: 0 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@
5858
import org.apache.paimon.table.source.Split;
5959
import org.apache.paimon.table.source.StreamTableScan;
6060
import org.apache.paimon.table.source.TableRead;
61-
import org.apache.paimon.table.source.TableScan;
6261
import org.apache.paimon.table.source.snapshot.SnapshotReader;
6362
import org.apache.paimon.table.system.AuditLogTable;
6463
import org.apache.paimon.table.system.FileMonitorTable;
@@ -67,7 +66,6 @@
6766
import org.apache.paimon.types.DataTypes;
6867
import org.apache.paimon.types.RowKind;
6968
import org.apache.paimon.types.RowType;
70-
import org.apache.paimon.utils.CompatibilityTestUtils;
7169
import org.apache.paimon.utils.Pair;
7270

7371
import org.assertj.core.api.Assertions;
@@ -605,101 +603,6 @@ private void innerTestStreamingFullChangelog(Consumer<Options> configure) throws
605603
"+U 2|40|242|binary|varbinary|mapKey:mapVal|multiset");
606604
}
607605

608-
@Test
609-
public void testStreamingChangelogCompatibility02() throws Exception {
610-
// already contains 2 commits
611-
CompatibilityTestUtils.unzip(
612-
"compatibility/table-changelog-0.2.zip", tablePath.toUri().getPath());
613-
FileStoreTable table =
614-
createFileStoreTable(
615-
conf -> conf.set(CHANGELOG_PRODUCER, ChangelogProducer.INPUT),
616-
COMPATIBILITY_ROW_TYPE);
617-
618-
List<List<List<String>>> expected =
619-
Arrays.asList(
620-
// first changelog snapshot
621-
Arrays.asList(
622-
// partition 1
623-
Arrays.asList(
624-
"+I 1|10|100|binary|varbinary",
625-
"+I 1|20|200|binary|varbinary",
626-
"-D 1|10|100|binary|varbinary",
627-
"+I 1|10|101|binary|varbinary",
628-
"-U 1|10|101|binary|varbinary",
629-
"+U 1|10|102|binary|varbinary"),
630-
// partition 2
631-
Collections.singletonList("+I 2|10|300|binary|varbinary")),
632-
// second changelog snapshot
633-
Arrays.asList(
634-
// partition 1
635-
Collections.singletonList("-D 1|20|200|binary|varbinary"),
636-
// partition 2
637-
Arrays.asList(
638-
"-U 2|10|300|binary|varbinary",
639-
"+U 2|10|301|binary|varbinary",
640-
"+I 2|20|400|binary|varbinary")),
641-
// third changelog snapshot
642-
Arrays.asList(
643-
// partition 1
644-
Arrays.asList(
645-
"-U 1|10|102|binary|varbinary",
646-
"+U 1|10|103|binary|varbinary",
647-
"+I 1|20|201|binary|varbinary"),
648-
// partition 2
649-
Collections.singletonList("-D 2|10|301|binary|varbinary")));
650-
651-
StreamTableScan scan = table.newStreamScan();
652-
scan.restore(1L);
653-
654-
Function<Integer, Void> assertNextSnapshot =
655-
i -> {
656-
TableScan.Plan plan = scan.plan();
657-
assertThat(plan).isNotNull();
658-
659-
List<Split> splits = plan.splits();
660-
TableRead read = table.newRead();
661-
for (int j = 0; j < 2; j++) {
662-
try {
663-
assertThat(
664-
getResult(
665-
read,
666-
splits,
667-
binaryRow(j + 1),
668-
0,
669-
COMPATIBILITY_CHANGELOG_ROW_TO_STRING))
670-
.isEqualTo(expected.get(i).get(j));
671-
} catch (Exception e) {
672-
throw new RuntimeException(e);
673-
}
674-
}
675-
676-
return null;
677-
};
678-
679-
for (int i = 0; i < 2; i++) {
680-
assertNextSnapshot.apply(i);
681-
}
682-
683-
// no more changelog
684-
assertThat(scan.plan().splits()).isEmpty();
685-
686-
// write another commit
687-
StreamTableWrite write = table.newWrite(commitUser);
688-
StreamTableCommit commit = table.newCommit(commitUser);
689-
write.write(rowDataWithKind(RowKind.UPDATE_BEFORE, 1, 10, 102L));
690-
write.write(rowDataWithKind(RowKind.UPDATE_AFTER, 1, 10, 103L));
691-
write.write(rowDataWithKind(RowKind.INSERT, 1, 20, 201L));
692-
write.write(rowDataWithKind(RowKind.DELETE, 2, 10, 301L));
693-
commit.commit(2, write.prepareCommit(true, 2));
694-
write.close();
695-
commit.close();
696-
697-
assertNextSnapshot.apply(2);
698-
699-
// no more changelog
700-
assertThat(scan.plan().splits()).isEmpty();
701-
}
702-
703606
private void writeData() throws Exception {
704607
FileStoreTable table = createFileStoreTable();
705608
StreamTableWrite write = table.newWrite(commitUser);
Binary file not shown.

0 commit comments

Comments
 (0)