Skip to content

Commit 5564403

Browse files
authored
[core] Changelog expire should check index file existence (#4186)
1 parent 983a552 commit 5564403

File tree

4 files changed

+85
-4
lines changed

4 files changed

+85
-4
lines changed

paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,14 @@ public static Consumer fromJson(String json) {
6161
public static Optional<Consumer> fromPath(FileIO fileIO, Path path) {
6262
int retryNumber = 0;
6363
MismatchedInputException exception = null;
64-
while (retryNumber++ < 5) {
64+
while (retryNumber++ < 10) {
6565
try {
6666
return fileIO.readOverwrittenFileUtf8(path).map(Consumer::fromJson);
6767
} catch (MismatchedInputException e) {
6868
// retry
6969
exception = e;
7070
try {
71-
Thread.sleep(100);
71+
Thread.sleep(1_000);
7272
} catch (InterruptedException ie) {
7373
Thread.currentThread().interrupt();
7474
throw new RuntimeException(ie);

paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public Set<String> manifestSkippingSet(List<Snapshot> skippingSnapshots) {
109109

110110
// index manifests
111111
String indexManifest = skippingSnapshot.indexManifest();
112-
if (indexManifest != null) {
112+
if (indexManifest != null && indexFileHandler.existsManifest(indexManifest)) {
113113
skippingSet.add(indexManifest);
114114
indexFileHandler.readManifest(indexManifest).stream()
115115
.map(IndexManifestEntry::indexFile)
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.table;
20+
21+
import org.apache.paimon.catalog.Catalog;
22+
import org.apache.paimon.catalog.CatalogContext;
23+
import org.apache.paimon.catalog.CatalogFactory;
24+
import org.apache.paimon.catalog.Identifier;
25+
import org.apache.paimon.fs.Path;
26+
import org.apache.paimon.options.ExpireConfig;
27+
import org.apache.paimon.schema.Schema;
28+
import org.apache.paimon.types.DataTypes;
29+
import org.apache.paimon.utils.TraceableFileIO;
30+
31+
import org.assertj.core.api.Assertions;
32+
import org.junit.jupiter.api.BeforeEach;
33+
import org.junit.jupiter.api.Test;
34+
35+
import java.util.UUID;
36+
37+
import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
38+
39+
/** Test for changelog expire. */
40+
public class ChangelogExpireTest extends IndexFileExpireTableTest {
41+
42+
@BeforeEach
43+
public void beforeEachBase() throws Exception {
44+
CatalogContext context =
45+
CatalogContext.create(
46+
new Path(TraceableFileIO.SCHEME + "://" + tempPath.toString()));
47+
context.options().set(CACHE_ENABLED.key(), "false");
48+
Catalog catalog = CatalogFactory.createCatalog(context);
49+
Identifier identifier = new Identifier("default", "T");
50+
catalog.createDatabase(identifier.getDatabaseName(), true);
51+
Schema schema =
52+
Schema.newBuilder()
53+
.column("pt", DataTypes.INT())
54+
.column("pk", DataTypes.INT())
55+
.column("col1", DataTypes.INT())
56+
.partitionKeys("pt")
57+
.primaryKey("pk", "pt")
58+
.option("changelog-producer", "input")
59+
.option("changelog.num-retained.max", "40")
60+
.option("snapshot.num-retained.max", "39")
61+
.options(tableOptions().toMap())
62+
.build();
63+
catalog.createTable(identifier, schema, true);
64+
table = (FileStoreTable) catalog.getTable(identifier);
65+
commitUser = UUID.randomUUID().toString();
66+
}
67+
68+
@Test
69+
public void testChangelogExpire() throws Exception {
70+
ExpireConfig expireConfig =
71+
ExpireConfig.builder().changelogRetainMax(40).snapshotRetainMax(39).build();
72+
prepareExpireTable();
73+
ExpireChangelogImpl expire =
74+
(ExpireChangelogImpl) table.newExpireChangelog().config(expireConfig);
75+
76+
ExpireSnapshotsImpl expireSnapshots =
77+
(ExpireSnapshotsImpl) table.newExpireSnapshots().config(expireConfig);
78+
expireSnapshots.expireUntil(1, 7);
79+
Assertions.assertThatCode(() -> expire.expireUntil(1, 6)).doesNotThrowAnyException();
80+
}
81+
}

paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ public void testIndexFileRollbackTag() throws Exception {
190190
assertThat(indexManifestSize()).isEqualTo(1);
191191
}
192192

193-
private void prepareExpireTable() throws Exception {
193+
protected void prepareExpireTable() throws Exception {
194194
StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder();
195195
StreamTableWrite write = writeBuilder.newWrite();
196196
StreamTableCommit commit = writeBuilder.newCommit();

0 commit comments

Comments
 (0)