Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.linkedin.openhouse.jobs.util.AppsOtelEmitter;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand All @@ -27,8 +28,9 @@
*/
@Slf4j
public class OrphanFilesDeletionSparkApp extends BaseTableSparkApp {
private final long ttlSeconds;
private long ttlSeconds;
private final String backupDir;
private static final int DEFAULT_MIN_OFD_TTL_IN_DAYS = 3;

public OrphanFilesDeletionSparkApp(
String jobId,
Expand All @@ -44,6 +46,7 @@ public OrphanFilesDeletionSparkApp(

@Override
protected void runInner(Operations ops) {
updateTtlSeconds(ops);
long olderThanTimestampMillis =
System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(ttlSeconds);
Table table = ops.getTable(fqtn);
Expand Down Expand Up @@ -71,6 +74,36 @@ protected void runInner(Operations ops) {
Attributes.of(AttributeKey.stringKey(AppConstants.TABLE_NAME), fqtn));
}

/**
* Validate and keep min OFD TTL for replica table as 3 days if provided TTL is less than 3 days
*
* @param ops
*/
private void updateTtlSeconds(Operations ops) {
Table table = ops.getTable(fqtn);
String tableType =
table
.properties()
.getOrDefault(AppConstants.OPENHOUSE_TABLE_TYPE_KEY, AppConstants.TABLE_TYPE_PRIMARY);
// Check if replica table and update TTL
if (AppConstants.TABLE_TYPE_REPLICA.equals(tableType)) {
long days = Duration.ofSeconds(ttlSeconds).toDays();
// Keep the min default OFD TTL for replica tables
if (days < DEFAULT_MIN_OFD_TTL_IN_DAYS) {
ttlSeconds = TimeUnit.DAYS.toSeconds(DEFAULT_MIN_OFD_TTL_IN_DAYS);
}
}
}

/**
* Get ttl in seconds
*
* @return long
*/
protected long getTtlSeconds() {
return ttlSeconds;
}

public static void main(String[] args) {
OtelEmitter otelEmitter =
new AppsOtelEmitter(Arrays.asList(DefaultOtelConfig.getOpenTelemetry()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public final class AppConstants {
// Maintenance jobs table properties keys
public static final String BACKUP_ENABLED_KEY = "retention.backup.enabled";
public static final String BACKUP_DIR_KEY = "retention.backup.dir";
public static final String OPENHOUSE_TABLE_TYPE_KEY = "openhouse.tableType";
public static final String TABLE_TYPE_PRIMARY = "PRIMARY_TABLE";
public static final String TABLE_TYPE_REPLICA = "REPLICA_TABLE";

private AppConstants() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package com.linkedin.openhouse.jobs.spark;

import com.linkedin.openhouse.common.metrics.DefaultOtelConfig;
import com.linkedin.openhouse.common.metrics.OtelEmitter;
import com.linkedin.openhouse.jobs.util.AppsOtelEmitter;
import com.linkedin.openhouse.tablestest.OpenHouseSparkITest;
import java.util.Arrays;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class OrphanFilesDeletionSparkAppTest extends OpenHouseSparkITest {
private final OtelEmitter otelEmitter =
new AppsOtelEmitter(Arrays.asList(DefaultOtelConfig.getOpenTelemetry()));

@Test
public void testTtlSecondsPrimaryTableOneDayTtl() throws Exception {
final String tableName = "db.test_ofd1";

try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
// create table
prepareTable(ops, tableName);
// create and test ofd spark job
OrphanFilesDeletionSparkApp app =
new OrphanFilesDeletionSparkApp(
"test-ofd-job1", null, tableName, 86400, otelEmitter, ".backup");
Assertions.assertEquals(86400, app.getTtlSeconds());
}
}

@Test
public void testTtlSecondsPrimaryTableThreeDayTtl() throws Exception {
final String tableName = "db.test_ofd2";

try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
// create table
prepareTable(ops, tableName);
// create and test ofd spark job
OrphanFilesDeletionSparkApp app =
new OrphanFilesDeletionSparkApp(
"test-ofd-job2", null, tableName, 259200, otelEmitter, ".backup");
Assertions.assertEquals(259200, app.getTtlSeconds());
}
}

@Test
public void testTtlSecondsReplicaTableOneDayTtl() throws Exception {
final String tableName = "db.test_ofd3";

try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
// create table
prepareReplicaTable(ops, tableName);
// create and test ofd spark job
OrphanFilesDeletionSparkApp app =
new OrphanFilesDeletionSparkApp(
"test-ofd-job3", null, tableName, 86400, otelEmitter, ".backup");
app.runInner(ops);
Assertions.assertEquals(259200, app.getTtlSeconds());
}
}

@Test
public void testTtlSecondsReplicaTableThreeDayTtl() throws Exception {
final String tableName = "db.test_ofd4";

try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
// create table
prepareReplicaTable(ops, tableName);
// create and test ofd spark job
OrphanFilesDeletionSparkApp app =
new OrphanFilesDeletionSparkApp(
"test-ofd-job4", null, tableName, 259200, otelEmitter, ".backup");
app.runInner(ops);
Assertions.assertEquals(259200, app.getTtlSeconds());
}
}

private static void prepareTable(Operations ops, String tableName) {
ops.spark().sql(String.format("DROP TABLE IF EXISTS %s", tableName)).show();
ops.spark()
.sql(
String.format(
"CREATE TABLE %s (data string, ts timestamp) partitioned by (days(ts))", tableName))
.show();
ops.spark().sql(String.format("SHOW TBLPROPERTIES %s", tableName)).show(false);
}

private static void prepareReplicaTable(Operations ops, String tableName) {
ops.spark().sql(String.format("DROP TABLE IF EXISTS %s", tableName)).show();
ops.spark()
.sql(
String.format(
"CREATE TABLE %s (data string, ts timestamp) partitioned by (days(ts)) "
+ "TBLPROPERTIES ('openhouse.tableType' = 'REPLICA_TABLE')",
tableName))
.show();
ops.spark().sql(String.format("SHOW TBLPROPERTIES %s", tableName)).show(false);
}
}