Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public Set<CompactionInfo> findPotentialCompactions(long lastChecked, ShowCompac
Set<String> skipDBs, Set<String> skipTables) {
Set<CompactionInfo> compactionTargets = Sets.newHashSet();

getTables().stream()
getTableNames().stream()
.filter(table -> !skipDBs.contains(table.getDb()))
.filter(table -> !skipTables.contains(table.getNotEmptyDbTable()))
.map(table -> {
Expand Down Expand Up @@ -126,9 +126,9 @@ public Set<CompactionInfo> findPotentialCompactions(long lastChecked, ShowCompac
return compactionTargets;
}

private List<org.apache.hadoop.hive.common.TableName> getTables() {
private List<org.apache.hadoop.hive.common.TableName> getTableNames() {
try {
return IcebergTableUtil.getTableFetcher(client, null, "*", null).getTables();
return IcebergTableUtil.getTableFetcher(client, null, "*", null).getTableNames();
} catch (Exception e) {
throw new RuntimeMetaException(e, "Error getting table names");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,13 @@
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
import org.apache.hadoop.hive.metastore.api.GetTableRequest;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.txn.NoMutex;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.Table;
import org.apache.iceberg.mr.hive.IcebergTableUtil;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -79,35 +77,21 @@ public void run() {

private void expireTables(String catalogName, String dbPattern, String tablePattern) {
try (IMetaStoreClient msc = new HiveMetaStoreClient(conf)) {
// TODO: HIVE-28952 – modify TableFetcher to return HMS Table API objects directly,
// avoiding the need for subsequent msc.getTable calls to fetch each matched table individually
List<TableName> tables = IcebergTableUtil.getTableFetcher(msc, catalogName, dbPattern, tablePattern).getTables();

int maxBatchSize = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX);
List<org.apache.hadoop.hive.metastore.api.Table> tables =
IcebergTableUtil.getTableFetcher(msc, catalogName, dbPattern, tablePattern).getTables(maxBatchSize);
LOG.debug("{} candidate tables found", tables.size());

for (TableName table : tables) {
try {
expireSnapshotsForTable(getIcebergTable(table, msc));
} catch (Exception e) {
LOG.error("Exception while running iceberg expiry service on catalog/db/table: {}/{}/{}",
catalogName, dbPattern, tablePattern, e);
}
for (org.apache.hadoop.hive.metastore.api.Table table : tables) {
expireSnapshotsForTable(getIcebergTable(table));
Copy link
Contributor

@okumin okumin Aug 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recalled that we would like to retain the try-catch. We intentionally added it to avoid skipping everything when a single expiration fails.
See also: #5786 (comment)

}
} catch (Exception e) {
throw new RuntimeException("Error while getting tables from metastore", e);
}
}

private Table getIcebergTable(TableName tableName, IMetaStoreClient msc) {
return tableCache.get(tableName, key -> {
LOG.debug("Getting iceberg table from metastore as it's not present in table cache: {}", tableName);
GetTableRequest request = new GetTableRequest(tableName.getDb(), tableName.getTable());
try {
return IcebergTableUtil.getTable(conf, msc.getTable(request));
} catch (TException e) {
throw new RuntimeException(e);
}
});
private Table getIcebergTable(org.apache.hadoop.hive.metastore.api.Table table) {
TableName tableName = TableName.fromString(table.getTableName(), table.getCatName(), table.getDbName());
return tableCache.get(tableName, key -> IcebergTableUtil.getTable(conf, table));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.GetTableRequest;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.utils.TableFetcher;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.Table;
Expand Down Expand Up @@ -69,8 +69,11 @@ public void testIcebergTableFetched() throws Exception {

TableFetcher tableFetcher = IcebergTableUtil.getTableFetcher(db.getMSC(), null, "default", "*");

List<TableName> tables = tableFetcher.getTables();
Assert.assertEquals(new TableName("hive", "default", "iceberg_table"), tables.get(0));
int maxBatchSize = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX);
List<org.apache.hadoop.hive.metastore.api.Table> tables = tableFetcher.getTables(maxBatchSize);
Assert.assertEquals("hive", tables.get(0).getCatName());
Assert.assertEquals("default", tables.get(0).getDbName());
Assert.assertEquals("iceberg_table", tables.get(0).getTableName());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableIterable;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -90,7 +92,7 @@ private void buildTableFilter(String tablePattern, List<String> conditions) {
this.tableFilter = String.join(" and ", conditions);
}

public List<TableName> getTables() throws Exception {
public List<TableName> getTableNames() throws Exception {
List<TableName> candidates = new ArrayList<>();

// if tableTypes is empty, then a list with single empty string has to specified to scan no tables.
Expand All @@ -102,21 +104,47 @@ public List<TableName> getTables() throws Exception {
List<String> databases = client.getDatabases(catalogName, dbPattern);

for (String db : databases) {
Database database = client.getDatabase(catalogName, db);
if (MetaStoreUtils.checkIfDbNeedsToBeSkipped(database)) {
LOG.debug("Skipping table under database: {}", db);
continue;
}
if (MetaStoreUtils.isDbBeingPlannedFailedOver(database)) {
LOG.info("Skipping table that belongs to database {} being failed over.", db);
continue;
}
List<String> tablesNames = client.listTableNamesByFilter(catalogName, db, tableFilter, -1);
List<String> tablesNames = getTableNamesForDatabase(catalogName, db);
tablesNames.forEach(tablesName -> candidates.add(TableName.fromString(tablesName, catalogName, db)));
}
return candidates;
}

public List<Table> getTables(int maxBatchSize) throws Exception {
List<Table> candidates = new ArrayList<>();

// if tableTypes is empty, then a list with single empty string has to specified to scan no tables.
if (tableTypes.isEmpty()) {
LOG.info("Table fetcher returns empty list as no table types specified");
return candidates;
}

List<String> databases = client.getDatabases(catalogName, dbPattern);

for (String db : databases) {
List<String> tablesNames = getTableNamesForDatabase(catalogName, db);
Copy link
Member

@deniskuzZ deniskuzZ Aug 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Neer393 I don't understand what have you optimized here.
You are still doing multiple calls: 1 to get table names and another to get table objects. Why not get table objects directly?

Also, have you considered the memory impact when loading everything into the heap? You could have iterated over TableIterable instead. I don't think that is a robust solution, it can potentially lead to OOM.
cc @dengzhhu653, @wecharyu

Copy link
Contributor Author

@Neer393 Neer393 Aug 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The earlier implementation had one msc call for getting table names and then one msc call each for getting the HMS table object for each table name.

The newer implementation reduces the msc calls in a way that one msc call is made for getting all table names and then using TableIterable, the number of msc calls for getting table objects becomes Number of tables / (BATCH_MAX_RETRIEVE config value [Default is 300])

So in the older implementation number of msc calls = 1 + number of tables
whereas in the newer implementation number of msc calls = 1 + (number of tables / [BATCH_MAX_RETRIEVE])

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my earlier implementation I had the same proposal of directly getting table objects where I had implemented direct HMS API endpoint like listTableNamesByFilter but the idea was dropped by @vikramahuja1001

Copy link
Member

@deniskuzZ deniskuzZ Aug 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in order to use batching, you need to have the table list to fetch - that's ok. However, instead of working with the batches, you load everything into memory.
Could you refactor to use Iterable (i.e make getTables return Iterable<Table>)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay so for this fix should I create a new JIRA or as I am working on https://issues.apache.org/jira/browse/HIVE-28974 which is related to IcebergHouseKeeperService only should I attach the fix in this JIRA ?
Whatever you suggest is fine to me

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 Use the TableIterator is more reasonable to avoid possible OOM.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me summarize the points.

  • This PR would reduce # of API calls from O(num-tables) to O(num-tables / batch size). It's neat 👍
  • This PR would require the O(num-tables * tbl-size) space. We'd like to reduce it
  • Regardless of this PR, we retain the O(num-tables * length-of-table-name) space. Is it OK or should we optimize it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK, it's a tradeoff between the number of msc calls and the space.
As if we try to decrease the number of tables stored in memory, we would increase the number of msc calls and then there would be no point of this JIRA.
Please correct me if I am wrong but this is what I think

Copy link
Member

@deniskuzZ deniskuzZ Aug 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern was related to the fetch logic where we load all Hive table objects into the memory instead of using the batch iterator.

We also make O(num-tables) calls to load an Iceberg table. Can we optimize here? Then we put those into a separate cache. Maybe iwe could use CachingCatalog instead ?

tableCache.get(tableName, key -> IcebergTableUtil.getTable(conf, table))

for (Table table : new TableIterable(client, db, tablesNames, maxBatchSize)) {
candidates.add(table);
}
}
return candidates;
}

private List<String> getTableNamesForDatabase(String catalogName, String dbName) throws Exception {
List<String> tableNames = new ArrayList<>();
Database database = client.getDatabase(catalogName, dbName);
if (MetaStoreUtils.checkIfDbNeedsToBeSkipped(database)) {
LOG.debug("Skipping table under database: {}", dbName);
return tableNames;
}
if (MetaStoreUtils.isDbBeingPlannedFailedOver(database)) {
LOG.info("Skipping table that belongs to database {} being failed over.", dbName);
return tableNames;
}
tableNames = client.listTableNamesByFilter(catalogName, dbName, tableFilter, -1);
return tableNames;
}

public static class Builder {
private final IMetaStoreClient client;
private final String catalogName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void run() {
.tableCondition(
hive_metastoreConstants.HIVE_FILTER_FIELD_PARAMS + "discover__partitions like \"true\" ")
.build()
.getTables();
.getTableNames();

if (candidates.isEmpty()) {
LOG.info("Got empty table list in catalog: {}, dbPattern: {}", catalogName, dbPattern);
Expand Down
Loading