diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 85b37d67889d..1e86152ee16f 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -1292,6 +1292,21 @@ public enum ConfVars { "metastore.partition.management.table.pattern", "*", "Automatic partition management will look for tables using the specified table pattern"), + STATISTICS_MANAGEMENT_TASK_FREQUENCY("metastore.statistics.management.task.frequency", + "metastore.statistics.management.task.frequency", + 7, TimeUnit.DAYS, "Frequency at which timer task runs to do automatic statistics management for tables\n" + + "with table property 'statistics.auto.deletion'='true'. Statistics management include 2 configs. \n" + + "One is 'statistics.auto.deletion', and the other is 'statistics.retention.period'. \n" + + "When 'statistics.auto.deletion'='true' is set, statistics management will look for tables which their\n " + + "column statistics are over the retention period, and then delete the column stats. \n"), + STATISTICS_RETENTION_PERIOD("metastore.statistics.retention.period", + "metastore.statistics.retention.period", 365, TimeUnit.DAYS, "The retention period " + + "that we want to keep the stats for each table, which means if the stats are older than this period\n" + + "of time, the stats will be automatically deleted. \n"), + + STATISTICS_AUTO_DELETION("statistics.auto.deletion", "statistics.auto.deletion", true, + "Whether table/partition column statistics will be auto deleted after retention period"), + METASTORE_METADATA_TRANSFORMER_CLASS("metastore.metadata.transformer.class", "metastore.metadata.transformer.class", "org.apache.hadoop.hive.metastore.MetastoreDefaultTransformer", "Fully qualified class name for the metastore metadata transformer class \n" @@ -1526,7 +1541,8 @@ public enum ConfVars { ACID_METRICS_TASK_CLASS + "," + ACID_METRICS_LOGGER_CLASS + "," + "org.apache.hadoop.hive.metastore.HiveProtoEventsCleanerTask" + "," + "org.apache.hadoop.hive.metastore.ScheduledQueryExecutionsMaintTask" + "," - + "org.apache.hadoop.hive.metastore.ReplicationMetricsMaintTask", + + "org.apache.hadoop.hive.metastore.ReplicationMetricsMaintTask" + "," + + "org.apache.hadoop.hive.metastore.StatisticsManagementTask", "Comma separated list of tasks that will be started in separate threads. These will " + "always be started, regardless of whether the metastore is running in embedded mode " + "or in server mode. They must implement " + METASTORE_TASK_THREAD_CLASS), diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/StatisticsManagementTask.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/StatisticsManagementTask.java new file mode 100644 index 000000000000..becb4b4cfef6 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/StatisticsManagementTask.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.DeleteColumnStatisticsRequest; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.model.MTableColumnStatistics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jdo.PersistenceManager; +import javax.jdo.Query; + +/** + * Statistics management task is primarily responsible for auto deletion of table column stats based on a certain frequency + * + * If some table or partition column statistics are older than the configured retention interval + * (MetastoreConf.ConfVars.STATISTICS_RETENTION_PERIOD), they are deleted when this metastore task runs periodically. + */ +public class StatisticsManagementTask implements MetastoreTaskThread { + private static final Logger LOG = LoggerFactory.getLogger(StatisticsManagementTask.class); + + // The 2 configs for users to set in the conf + // this is an optional table property, if this property does not exist for a table, then it is not excluded + public static final String STATISTICS_AUTO_DELETION_EXCLUDE_TBLPROPERTY = "statistics.auto.deletion.exclude"; + + private static final Lock lock = new ReentrantLock(); + + private Configuration conf; + + @Override + public long runFrequency(TimeUnit unit) { + return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.STATISTICS_MANAGEMENT_TASK_FREQUENCY, unit); + } + + @Override + public void setConf(Configuration configuration) { + // we modify conf in setupConf(), so we make a copy + this.conf = configuration; + } + + @Override + public Configuration getConf() { + return conf; + } + + // what needs to be included in this run() method: + // get the "lastAnalyzed" information from TAB_COL_STATS and find all the tables need to be deleted + // delete all column stats + @Override + public void run() { + LOG.debug("Auto statistics deletion started. Cleaning up table/partition column statistics over the retention period."); + long retentionMillis = MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars. STATISTICS_RETENTION_PERIOD, TimeUnit.MILLISECONDS); + if (retentionMillis <= 0 || !MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.STATISTICS_AUTO_DELETION)) { + LOG.info("Statistics auto deletion is set to off currently."); + return; + } + if (!lock.tryLock()) { + return; + } + try { + long now = System.currentTimeMillis(); + long lastAnalyzedThreshold = (now - retentionMillis) / 1000; + + String filter = "lastAnalyzed < threshold"; + String paramStr = "long threshold"; + try (IMetaStoreClient msc = new HiveMetaStoreClient(conf)) { + RawStore ms = HMSHandler.getMSForConf(conf); + PersistenceManager pm = ((ObjectStore) ms).getPersistenceManager(); + + Query q = null; + try { + q = pm.newQuery(MTableColumnStatistics.class); + q.setFilter(filter); + q.declareParameters(paramStr); + // only fetch required fields, avoid loading heavy MTable objects + q.setResult( + "table.database.name, " + + "table.tableName, " + + "partitionName, " + + "table.parameters.get(\"" + STATISTICS_AUTO_DELETION_EXCLUDE_TBLPROPERTY + "\")" + ); + @SuppressWarnings("unchecked") + List rows = (List) q.execute(lastAnalyzedThreshold); + + for (Object[] row : rows) { + String dbName = (String) row[0]; + String tblName = (String) row[1]; + String partName = (String) row[2]; // can be null for table-level stats + String excludeVal = (String) row[3]; // can be null + + // exclude check uses projected param value + if (excludeVal != null) { + LOG.info("Skipping auto deletion of stats for table {}.{} due to STATISTICS_AUTO_DELETION_EXCLUDE_TBLPROPERTY property being set on the table.", dbName, tblName); + continue; + } + DeleteColumnStatisticsRequest request = new DeleteColumnStatisticsRequest(dbName, tblName); + request.setEngine("hive"); + + // decide tableLevel based on whether this stat row is table-level or partition-level + // avoids loading table partition keys / MTable + request.setTableLevel(partName == null); + msc.deleteColumnStatistics(request); + } + } finally { + if (q != null) { + q.closeAll(); + } + } + } + } catch (Exception e) { + LOG.error("Error during statistics auto deletion", e); + } finally { + lock.unlock(); + } + } + +} diff --git a/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java b/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java index 551ffabe6b9a..aac06202699d 100644 --- a/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java +++ b/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java @@ -70,6 +70,7 @@ import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkOpenTxns; import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkPartitionManagement; import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkRenameTable; +import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkStatisticsManagement; import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkTableCreate; import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkUpdatePartitionsStat; import static org.apache.hadoop.hive.metastore.tools.Util.getServerUri; @@ -303,7 +304,9 @@ private void runNonAcidBenchmarks() { .add("openTxn", () -> benchmarkOpenTxns(bench, bData, 1)) .add("PartitionManagementTask", - () -> benchmarkPartitionManagement(bench, bData, 1)); + () -> benchmarkPartitionManagement(bench, bData, 1)) + .add("StatisticsManagementTask", + () -> benchmarkStatisticsManagement(bench, bData, 1)); for (int howMany: instances) { suite.add("listTables" + '.' + howMany, @@ -345,7 +348,9 @@ private void runNonAcidBenchmarks() { .add("openTxns" + '.' + howMany, () -> benchmarkOpenTxns(bench, bData, howMany)) .add("PartitionManagementTask" + "." + howMany, - () -> benchmarkPartitionManagement(bench, bData, howMany)); + () -> benchmarkPartitionManagement(bench, bData, howMany)) + .add("PartitionStatisticsTask" + "." + howMany, + () -> benchmarkStatisticsManagement(bench, bData, howMany)); } List toRun = suite.listMatching(matches, exclude); diff --git a/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java b/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java index c01200c33be5..e49c2ca7ee46 100644 --- a/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java +++ b/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.PartitionManagementTask; +import org.apache.hadoop.hive.metastore.StatisticsManagementTask; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; @@ -42,6 +43,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -695,4 +697,64 @@ static DescriptiveStatistics benchmarkPartitionManagement(@NotNull MicroBenchmar } } + static DescriptiveStatistics benchmarkStatisticsManagement(@NotNull MicroBenchmark bench, + @NotNull BenchData data, + int tableCount) { + + String dbName = data.dbName + "_" + tableCount; + String tableNamePrefix = data.tableName; + final HMSClient client = data.getClient(); + final StatisticsManagementTask statsTask = new StatisticsManagementTask(); + try { + client.getHadoopConf().set("hive.metastore.uris", client.getServerURI().toString()); + client.getHadoopConf().set("metastore.statistics.management.database.pattern", dbName); + statsTask.setConf(client.getHadoopConf()); + + client.createDatabase(dbName); + for (int i = 0; i < tableCount; i++) { + String tableName = tableNamePrefix + "_" + i; + Util.TableBuilder tableBuilder = new Util.TableBuilder(dbName, tableName) + .withType(TableType.MANAGED_TABLE) + .withColumns(createSchema(Arrays.asList("col1:string", "col2:int"))) + .withPartitionKeys(createSchema(Collections.singletonList("part_col"))) + .withParameter("columnStatsAccurate", "true"); + + client.createTable(tableBuilder.build()); + addManyPartitionsNoException(client, dbName, tableName, null, Collections.singletonList("part_col"), 100); + + // simulate the partitions of each table which its stats has an old "lastAnalyzed" + List partitions = client.listPartitions(dbName, tableName); + for (Partition partition : partitions) { + Map params = partition.getParameters(); + // to manually change the "lastAnalyzed" to an old time, ex. 400 days + params.put("lastAnalyzed", String.valueOf(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(400))); + } + client.alterPartitions(dbName, tableName, partitions); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + + Runnable preRun = () -> LOG.debug("Preparing for benchmark..."); + + try { + DescriptiveStatistics stats = bench.measure(preRun, statsTask, null); + + // check if the stats are deleted + for (int i = 0; i < tableCount; i++) { + String tableName = tableNamePrefix + "_" + i; + List partitions = client.listPartitions(dbName, tableName); + for (Partition partition : partitions) { + Map params = partition.getParameters(); + if (params.containsKey("lastAnalyzed")) { + throw new AssertionError("Partition stats not deleted for table: " + tableName); + } + } + } + return stats; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }