Skip to content

Commit c681655

Browse files
author
yuzelin
committed
[core] Support enable and disable tag time expiration
1 parent c38eede commit c681655

File tree

10 files changed

+63
-8
lines changed

10 files changed

+63
-8
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1344,6 +1344,12 @@
13441344
<td><p>Enum</p></td>
13451345
<td>The date format for tag periods.<br /><br />Possible values:<ul><li>"with_dashes": Dates and hours with dashes, e.g., 'yyyy-MM-dd HH'</li><li>"without_dashes": Dates and hours without dashes, e.g., 'yyyyMMdd HH'</li><li>"without_dashes_and_spaces": Dates and hours without dashes and spaces, e.g., 'yyyyMMddHH'</li></ul></td>
13461346
</tr>
1347+
<tr>
1348+
<td><h5>tag.time-expire-enabled</h5></td>
1349+
<td style="word-wrap: break-word;">true</td>
1350+
<td>Boolean</td>
1351+
<td>Whether to enable tag expiration by retained time.</td>
1352+
</tr>
13471353
<tr>
13481354
<td><h5>target-file-size</h5></td>
13491355
<td style="word-wrap: break-word;">(none)</td>

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1612,6 +1612,12 @@ public InlineElement getDescription() {
16121612
"The default maximum time retained for newly created tags. "
16131613
+ "It affects both auto-created tags and manually created (by procedure) tags.");
16141614

1615+
public static final ConfigOption<Boolean> TAG_TIME_EXPIRE_ENABLED =
1616+
key("tag.time-expire-enabled")
1617+
.booleanType()
1618+
.defaultValue(true)
1619+
.withDescription("Whether to enable tag expiration by retained time.");
1620+
16151621
public static final ConfigOption<Boolean> TAG_AUTOMATIC_COMPLETION =
16161622
key("tag.automatic-completion")
16171623
.booleanType()
@@ -3029,6 +3035,10 @@ public Duration tagDefaultTimeRetained() {
30293035
return options.get(TAG_DEFAULT_TIME_RETAINED);
30303036
}
30313037

3038+
public boolean tagTimeExpireEnabled() {
3039+
return options.get(TAG_TIME_EXPIRE_ENABLED);
3040+
}
3041+
30323042
public boolean tagAutomaticCompletion() {
30333043
return options.get(TAG_AUTOMATIC_COMPLETION);
30343044
}

paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,18 @@
2424
import org.apache.paimon.utils.SnapshotManager;
2525
import org.apache.paimon.utils.TagManager;
2626

27+
import javax.annotation.Nullable;
28+
2729
import java.util.List;
2830

2931
/** A manager to create and expire tags. */
3032
public class TagAutoManager {
3133

32-
private final TagAutoCreation tagAutoCreation;
33-
private final TagTimeExpire tagTimeExpire;
34+
@Nullable private final TagAutoCreation tagAutoCreation;
35+
@Nullable private final TagTimeExpire tagTimeExpire;
3436

35-
private TagAutoManager(TagAutoCreation tagAutoCreation, TagTimeExpire tagTimeExpire) {
37+
private TagAutoManager(
38+
@Nullable TagAutoCreation tagAutoCreation, @Nullable TagTimeExpire tagTimeExpire) {
3639
this.tagAutoCreation = tagAutoCreation;
3740
this.tagTimeExpire = tagTimeExpire;
3841
}
@@ -60,13 +63,17 @@ public static TagAutoManager create(
6063
? null
6164
: TagAutoCreation.create(
6265
options, snapshotManager, tagManager, tagDeletion, callbacks),
63-
TagTimeExpire.create(snapshotManager, tagManager, tagDeletion, callbacks));
66+
options.tagTimeExpireEnabled()
67+
? TagTimeExpire.create(snapshotManager, tagManager, tagDeletion, callbacks)
68+
: null);
6469
}
6570

71+
@Nullable
6672
public TagAutoCreation getTagAutoCreation() {
6773
return tagAutoCreation;
6874
}
6975

76+
@Nullable
7077
public TagTimeExpire getTagTimeExpire() {
7178
return tagTimeExpire;
7279
}

paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.paimon.flink.procedure;
2020

21+
import org.apache.paimon.CoreOptions;
2122
import org.apache.paimon.catalog.Catalog;
2223
import org.apache.paimon.table.FileStoreTable;
2324
import org.apache.paimon.tag.TagTimeExpire;
@@ -26,6 +27,7 @@
2627
import org.apache.flink.table.procedure.ProcedureContext;
2728

2829
import java.time.LocalDateTime;
30+
import java.util.Collections;
2931
import java.util.List;
3032
import java.util.TimeZone;
3133

@@ -42,6 +44,10 @@ public String[] call(ProcedureContext procedureContext, String tableId)
4244
public String[] call(ProcedureContext procedureContext, String tableId, String olderThanStr)
4345
throws Catalog.TableNotExistException {
4446
FileStoreTable fileStoreTable = (FileStoreTable) table(tableId);
47+
fileStoreTable =
48+
fileStoreTable.copy(
49+
Collections.singletonMap(
50+
CoreOptions.TAG_TIME_EXPIRE_ENABLED.key(), "true"));
4551
TagTimeExpire tagTimeExpire =
4652
fileStoreTable.store().newTagAutoManager(fileStoreTable).getTagTimeExpire();
4753
if (olderThanStr != null) {

paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.ArrayList;
3333
import java.util.Arrays;
3434
import java.util.List;
35+
import java.util.concurrent.ThreadLocalRandom;
3536

3637
import static org.assertj.core.api.Assertions.assertThat;
3738
import static org.assertj.core.api.Assertions.assertThatCode;
@@ -499,14 +500,18 @@ public void testRewriteFileIndex() {
499500

500501
@Test
501502
public void testExpireTags() throws Exception {
503+
boolean tagTimeExpireEnabled = ThreadLocalRandom.current().nextBoolean();
502504
sql(
503505
"CREATE TABLE T ("
504506
+ " k STRING,"
505507
+ " dt STRING,"
506508
+ " PRIMARY KEY (k, dt) NOT ENFORCED"
507509
+ ") PARTITIONED BY (dt) WITH ("
508-
+ " 'bucket' = '1'"
509-
+ ")");
510+
+ " 'bucket' = '1',"
511+
+ " 'tag.time-expire-enabled' = '%s'"
512+
+ ")",
513+
tagTimeExpireEnabled);
514+
510515
FileStoreTable table = paimonTable("T");
511516
for (int i = 1; i <= 3; i++) {
512517
sql("INSERT INTO T VALUES ('" + i + "', '" + i + "')");

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.paimon.flink.procedure;
2020

21+
import org.apache.paimon.CoreOptions;
2122
import org.apache.paimon.catalog.Catalog;
2223
import org.apache.paimon.table.FileStoreTable;
2324
import org.apache.paimon.tag.TagTimeExpire;
@@ -32,6 +33,7 @@
3233
import javax.annotation.Nullable;
3334

3435
import java.time.LocalDateTime;
36+
import java.util.Collections;
3537
import java.util.List;
3638
import java.util.TimeZone;
3739

@@ -52,6 +54,10 @@ public class ExpireTagsProcedure extends ProcedureBase {
5254
ProcedureContext procedureContext, String tableId, @Nullable String olderThanStr)
5355
throws Catalog.TableNotExistException {
5456
FileStoreTable fileStoreTable = (FileStoreTable) table(tableId);
57+
fileStoreTable =
58+
fileStoreTable.copy(
59+
Collections.singletonMap(
60+
CoreOptions.TAG_TIME_EXPIRE_ENABLED.key(), "true"));
5561
TagTimeExpire tagTimeExpire =
5662
fileStoreTable.store().newTagAutoManager(fileStoreTable).getTagTimeExpire();
5763
if (olderThanStr != null) {

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import java.nio.file.Path;
3131
import java.time.LocalDateTime;
32+
import java.util.concurrent.ThreadLocalRandom;
3233

3334
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.bEnv;
3435
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
@@ -47,10 +48,13 @@ public void setUp() {
4748
@ParameterizedTest
4849
@ValueSource(booleans = {false, true})
4950
public void testExpireTags(boolean startFlinkJob) throws Exception {
51+
boolean tagTimeExpireEnabled = ThreadLocalRandom.current().nextBoolean();
5052
bEnv.executeSql(
5153
"CREATE TABLE T (id STRING, name STRING,"
5254
+ " PRIMARY KEY (id) NOT ENFORCED)"
53-
+ " WITH ('bucket'='1', 'write-only'='true')");
55+
+ " WITH ('bucket'='1', 'write-only'='true', 'tag.time-expire-enabled' = '"
56+
+ tagTimeExpireEnabled
57+
+ "')");
5458

5559
expireTags(startFlinkJob);
5660
}

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireTagsProcedureITCase.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.io.IOException;
3030
import java.time.LocalDateTime;
3131
import java.util.List;
32+
import java.util.concurrent.ThreadLocalRandom;
3233

3334
import static org.assertj.core.api.Assertions.assertThat;
3435

@@ -37,10 +38,12 @@ public class ExpireTagsProcedureITCase extends CatalogITCaseBase {
3738

3839
@Test
3940
public void testExpireTagsByTagCreateTimeAndTagTimeRetained() throws Exception {
41+
boolean tagTimeExpireEnabled = ThreadLocalRandom.current().nextBoolean();
4042
sql(
4143
"CREATE TABLE T (id STRING, name STRING,"
4244
+ " PRIMARY KEY (id) NOT ENFORCED)"
43-
+ " WITH ('bucket'='1', 'write-only'='true')");
45+
+ " WITH ('bucket'='1', 'write-only'='true', 'tag.time-expire-enabled'='%s')",
46+
tagTimeExpireEnabled);
4447

4548
FileStoreTable table = paimonTable("T");
4649
SnapshotManager snapshotManager = table.snapshotManager();

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireTagsProcedure.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.paimon.spark.procedure;
2020

21+
import org.apache.paimon.CoreOptions;
2122
import org.apache.paimon.table.FileStoreTable;
2223
import org.apache.paimon.tag.TagTimeExpire;
2324
import org.apache.paimon.utils.DateTimeUtils;
@@ -31,6 +32,7 @@
3132
import org.apache.spark.unsafe.types.UTF8String;
3233

3334
import java.time.LocalDateTime;
35+
import java.util.Collections;
3436
import java.util.List;
3537
import java.util.TimeZone;
3638

@@ -73,6 +75,10 @@ public InternalRow[] call(InternalRow args) {
7375
tableIdent,
7476
table -> {
7577
FileStoreTable fileStoreTable = (FileStoreTable) table;
78+
fileStoreTable =
79+
fileStoreTable.copy(
80+
Collections.singletonMap(
81+
CoreOptions.TAG_TIME_EXPIRE_ENABLED.key(), "true"));
7682
TagTimeExpire tagTimeExpire =
7783
fileStoreTable
7884
.store()

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireTagsProcedureTest.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@ import org.assertj.core.api.Assertions.assertThat
2828
class ExpireTagsProcedureTest extends PaimonSparkTestBase {
2929

3030
test("Paimon procedure: expire tags that reached its timeRetained") {
31+
val tagTimeExpireEnabled = scala.util.Random.nextBoolean()
3132
spark.sql(s"""
3233
|CREATE TABLE T (id STRING, name STRING)
3334
|USING PAIMON
35+
|TBLPROPERTIES ('tag.time-expire-enabled' = '$tagTimeExpireEnabled')
3436
|""".stripMargin)
3537

3638
val table = loadTable("T")

0 commit comments

Comments
 (0)