Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public Set<CompactionInfo> findPotentialCompactions(long lastChecked, ShowCompac
Set<String> skipDBs, Set<String> skipTables) {
Set<CompactionInfo> compactionTargets = Sets.newHashSet();

getTableNames().stream()
getTables().stream()
.filter(table -> !skipDBs.contains(table.getDb()))
.filter(table -> !skipTables.contains(table.getNotEmptyDbTable()))
.map(table -> {
Expand Down Expand Up @@ -126,9 +126,9 @@ public Set<CompactionInfo> findPotentialCompactions(long lastChecked, ShowCompac
return compactionTargets;
}

private List<org.apache.hadoop.hive.common.TableName> getTableNames() {
private List<org.apache.hadoop.hive.common.TableName> getTables() {
try {
return IcebergTableUtil.getTableFetcher(client, null, "*", null).getTableNames();
return IcebergTableUtil.getTableFetcher(client, null, "*", null).getTables();
} catch (Exception e) {
throw new RuntimeMetaException(e, "Error getting table names");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
import org.apache.hadoop.hive.metastore.api.GetTableRequest;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.txn.NoMutex;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.Table;
import org.apache.iceberg.mr.hive.IcebergTableUtil;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -77,21 +79,35 @@ public void run() {

private void expireTables(String catalogName, String dbPattern, String tablePattern) {
try (IMetaStoreClient msc = new HiveMetaStoreClient(conf)) {
int maxBatchSize = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX);
List<org.apache.hadoop.hive.metastore.api.Table> tables =
IcebergTableUtil.getTableFetcher(msc, catalogName, dbPattern, tablePattern).getTables(maxBatchSize);
// TODO: HIVE-28952 – modify TableFetcher to return HMS Table API objects directly,
// avoiding the need for subsequent msc.getTable calls to fetch each matched table individually
List<TableName> tables = IcebergTableUtil.getTableFetcher(msc, catalogName, dbPattern, tablePattern).getTables();

LOG.debug("{} candidate tables found", tables.size());
for (org.apache.hadoop.hive.metastore.api.Table table : tables) {
expireSnapshotsForTable(getIcebergTable(table));

for (TableName table : tables) {
try {
expireSnapshotsForTable(getIcebergTable(table, msc));
} catch (Exception e) {
LOG.error("Exception while running iceberg expiry service on catalog/db/table: {}/{}/{}",
catalogName, dbPattern, tablePattern, e);
}
}
} catch (Exception e) {
throw new RuntimeException("Error while getting tables from metastore", e);
}
}

private Table getIcebergTable(org.apache.hadoop.hive.metastore.api.Table table) {
TableName tableName = TableName.fromString(table.getTableName(), table.getCatName(), table.getDbName());
return tableCache.get(tableName, key -> IcebergTableUtil.getTable(conf, table));
private Table getIcebergTable(TableName tableName, IMetaStoreClient msc) {
return tableCache.get(tableName, key -> {
LOG.debug("Getting iceberg table from metastore as it's not present in table cache: {}", tableName);
GetTableRequest request = new GetTableRequest(tableName.getDb(), tableName.getTable());
try {
return IcebergTableUtil.getTable(conf, msc.getTable(request));
} catch (TException e) {
throw new RuntimeException(e);
}
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.GetTableRequest;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.utils.TableFetcher;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.Table;
Expand Down Expand Up @@ -69,11 +69,8 @@ public void testIcebergTableFetched() throws Exception {

TableFetcher tableFetcher = IcebergTableUtil.getTableFetcher(db.getMSC(), null, "default", "*");

int maxBatchSize = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX);
List<org.apache.hadoop.hive.metastore.api.Table> tables = tableFetcher.getTables(maxBatchSize);
Assert.assertEquals("hive", tables.get(0).getCatName());
Assert.assertEquals("default", tables.get(0).getDbName());
Assert.assertEquals("iceberg_table", tables.get(0).getTableName());
List<TableName> tables = tableFetcher.getTables();
Assert.assertEquals(new TableName("hive", "default", "iceberg_table"), tables.get(0));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableIterable;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -92,7 +90,7 @@ private void buildTableFilter(String tablePattern, List<String> conditions) {
this.tableFilter = String.join(" and ", conditions);
}

public List<TableName> getTableNames() throws Exception {
public List<TableName> getTables() throws Exception {
List<TableName> candidates = new ArrayList<>();

// if tableTypes is empty, then a list with single empty string has to specified to scan no tables.
Expand All @@ -104,47 +102,21 @@ public List<TableName> getTableNames() throws Exception {
List<String> databases = client.getDatabases(catalogName, dbPattern);

for (String db : databases) {
List<String> tablesNames = getTableNamesForDatabase(catalogName, db);
tablesNames.forEach(tablesName -> candidates.add(TableName.fromString(tablesName, catalogName, db)));
}
return candidates;
}

public List<Table> getTables(int maxBatchSize) throws Exception {
List<Table> candidates = new ArrayList<>();

// if tableTypes is empty, then a list with single empty string has to specified to scan no tables.
if (tableTypes.isEmpty()) {
LOG.info("Table fetcher returns empty list as no table types specified");
return candidates;
}

List<String> databases = client.getDatabases(catalogName, dbPattern);

for (String db : databases) {
List<String> tablesNames = getTableNamesForDatabase(catalogName, db);
for (Table table : new TableIterable(client, db, tablesNames, maxBatchSize)) {
candidates.add(table);
Database database = client.getDatabase(catalogName, db);
if (MetaStoreUtils.checkIfDbNeedsToBeSkipped(database)) {
LOG.debug("Skipping table under database: {}", db);
continue;
}
if (MetaStoreUtils.isDbBeingPlannedFailedOver(database)) {
LOG.info("Skipping table that belongs to database {} being failed over.", db);
continue;
}
List<String> tablesNames = client.listTableNamesByFilter(catalogName, db, tableFilter, -1);
tablesNames.forEach(tablesName -> candidates.add(TableName.fromString(tablesName, catalogName, db)));
}
return candidates;
}

private List<String> getTableNamesForDatabase(String catalogName, String dbName) throws Exception {
List<String> tableNames = new ArrayList<>();
Database database = client.getDatabase(catalogName, dbName);
if (MetaStoreUtils.checkIfDbNeedsToBeSkipped(database)) {
LOG.debug("Skipping table under database: {}", dbName);
return tableNames;
}
if (MetaStoreUtils.isDbBeingPlannedFailedOver(database)) {
LOG.info("Skipping table that belongs to database {} being failed over.", dbName);
return tableNames;
}
tableNames = client.listTableNamesByFilter(catalogName, dbName, tableFilter, -1);
return tableNames;
}

public static class Builder {
private final IMetaStoreClient client;
private final String catalogName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void run() {
.tableCondition(
hive_metastoreConstants.HIVE_FILTER_FIELD_PARAMS + "discover__partitions like \"true\" ")
.build()
.getTableNames();
.getTables();

if (candidates.isEmpty()) {
LOG.info("Got empty table list in catalog: {}, dbPattern: {}", catalogName, dbPattern);
Expand Down
Loading