diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/OrphanFilesDeletionSparkApp.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/OrphanFilesDeletionSparkApp.java index c7e0a9faa..9e80bea27 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/OrphanFilesDeletionSparkApp.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/OrphanFilesDeletionSparkApp.java @@ -8,6 +8,7 @@ import com.linkedin.openhouse.jobs.util.AppsOtelEmitter; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -27,8 +28,9 @@ */ @Slf4j public class OrphanFilesDeletionSparkApp extends BaseTableSparkApp { - private final long ttlSeconds; + private long ttlSeconds; private final String backupDir; + private static final int DEFAULT_MIN_OFD_TTL_IN_DAYS = 3; public OrphanFilesDeletionSparkApp( String jobId, @@ -44,6 +46,7 @@ public OrphanFilesDeletionSparkApp( @Override protected void runInner(Operations ops) { + updateTtlSeconds(ops); long olderThanTimestampMillis = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(ttlSeconds); Table table = ops.getTable(fqtn); @@ -71,6 +74,36 @@ protected void runInner(Operations ops) { Attributes.of(AttributeKey.stringKey(AppConstants.TABLE_NAME), fqtn)); } + /** + * Validate and keep min OFD TTL for replica table as 3 days if provided TTL is less than 3 days + * + * @param ops + */ + private void updateTtlSeconds(Operations ops) { + Table table = ops.getTable(fqtn); + String tableType = + table + .properties() + .getOrDefault(AppConstants.OPENHOUSE_TABLE_TYPE_KEY, AppConstants.TABLE_TYPE_PRIMARY); + // Check if replica table and update TTL + if (AppConstants.TABLE_TYPE_REPLICA.equals(tableType)) { + long days = Duration.ofSeconds(ttlSeconds).toDays(); + // Keep the min default OFD TTL for replica tables + if (days < DEFAULT_MIN_OFD_TTL_IN_DAYS) { + ttlSeconds = TimeUnit.DAYS.toSeconds(DEFAULT_MIN_OFD_TTL_IN_DAYS); + } + } + } + + /** + * Get ttl in seconds + * + * @return long + */ + protected long getTtlSeconds() { + return ttlSeconds; + } + public static void main(String[] args) { OtelEmitter otelEmitter = new AppsOtelEmitter(Arrays.asList(DefaultOtelConfig.getOpenTelemetry())); diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/AppConstants.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/AppConstants.java index d001d2f23..8588c7337 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/AppConstants.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/AppConstants.java @@ -53,6 +53,9 @@ public final class AppConstants { // Maintenance jobs table properties keys public static final String BACKUP_ENABLED_KEY = "retention.backup.enabled"; public static final String BACKUP_DIR_KEY = "retention.backup.dir"; + public static final String OPENHOUSE_TABLE_TYPE_KEY = "openhouse.tableType"; + public static final String TABLE_TYPE_PRIMARY = "PRIMARY_TABLE"; + public static final String TABLE_TYPE_REPLICA = "REPLICA_TABLE"; private AppConstants() {} } diff --git a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OrphanFilesDeletionSparkAppTest.java b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OrphanFilesDeletionSparkAppTest.java new file mode 100644 index 000000000..33109b4cf --- /dev/null +++ b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OrphanFilesDeletionSparkAppTest.java @@ -0,0 +1,98 @@ +package com.linkedin.openhouse.jobs.spark; + +import com.linkedin.openhouse.common.metrics.DefaultOtelConfig; +import com.linkedin.openhouse.common.metrics.OtelEmitter; +import com.linkedin.openhouse.jobs.util.AppsOtelEmitter; +import com.linkedin.openhouse.tablestest.OpenHouseSparkITest; +import java.util.Arrays; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class OrphanFilesDeletionSparkAppTest extends OpenHouseSparkITest { + private final OtelEmitter otelEmitter = + new AppsOtelEmitter(Arrays.asList(DefaultOtelConfig.getOpenTelemetry())); + + @Test + public void testTtlSecondsPrimaryTableOneDayTtl() throws Exception { + final String tableName = "db.test_ofd1"; + + try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) { + // create table + prepareTable(ops, tableName); + // create and test ofd spark job + OrphanFilesDeletionSparkApp app = + new OrphanFilesDeletionSparkApp( + "test-ofd-job1", null, tableName, 86400, otelEmitter, ".backup"); + Assertions.assertEquals(86400, app.getTtlSeconds()); + } + } + + @Test + public void testTtlSecondsPrimaryTableThreeDayTtl() throws Exception { + final String tableName = "db.test_ofd2"; + + try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) { + // create table + prepareTable(ops, tableName); + // create and test ofd spark job + OrphanFilesDeletionSparkApp app = + new OrphanFilesDeletionSparkApp( + "test-ofd-job2", null, tableName, 259200, otelEmitter, ".backup"); + Assertions.assertEquals(259200, app.getTtlSeconds()); + } + } + + @Test + public void testTtlSecondsReplicaTableOneDayTtl() throws Exception { + final String tableName = "db.test_ofd3"; + + try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) { + // create table + prepareReplicaTable(ops, tableName); + // create and test ofd spark job + OrphanFilesDeletionSparkApp app = + new OrphanFilesDeletionSparkApp( + "test-ofd-job3", null, tableName, 86400, otelEmitter, ".backup"); + app.runInner(ops); + Assertions.assertEquals(259200, app.getTtlSeconds()); + } + } + + @Test + public void testTtlSecondsReplicaTableThreeDayTtl() throws Exception { + final String tableName = "db.test_ofd4"; + + try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) { + // create table + prepareReplicaTable(ops, tableName); + // create and test ofd spark job + OrphanFilesDeletionSparkApp app = + new OrphanFilesDeletionSparkApp( + "test-ofd-job4", null, tableName, 259200, otelEmitter, ".backup"); + app.runInner(ops); + Assertions.assertEquals(259200, app.getTtlSeconds()); + } + } + + private static void prepareTable(Operations ops, String tableName) { + ops.spark().sql(String.format("DROP TABLE IF EXISTS %s", tableName)).show(); + ops.spark() + .sql( + String.format( + "CREATE TABLE %s (data string, ts timestamp) partitioned by (days(ts))", tableName)) + .show(); + ops.spark().sql(String.format("SHOW TBLPROPERTIES %s", tableName)).show(false); + } + + private static void prepareReplicaTable(Operations ops, String tableName) { + ops.spark().sql(String.format("DROP TABLE IF EXISTS %s", tableName)).show(); + ops.spark() + .sql( + String.format( + "CREATE TABLE %s (data string, ts timestamp) partitioned by (days(ts)) " + + "TBLPROPERTIES ('openhouse.tableType' = 'REPLICA_TABLE')", + tableName)) + .show(); + ops.spark().sql(String.format("SHOW TBLPROPERTIES %s", tableName)).show(false); + } +}