Skip to content

Commit 0bdbda0

Browse files
author
Hongdan Zhu
committed
HIVE-28755: Statistics Management Task
1 parent e34e87f commit 0bdbda0

File tree

6 files changed

+294
-2
lines changed

6 files changed

+294
-2
lines changed

standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1290,6 +1290,21 @@ public enum ConfVars {
12901290
"metastore.partition.management.table.pattern", "*",
12911291
"Automatic partition management will look for tables using the specified table pattern"),
12921292

1293+
STATISTICS_MANAGEMENT_TASK_FREQUENCY("metastore.statistics.management.task.frequency",
1294+
"metastore.statistics.management.task.frequency",
1295+
365, TimeUnit.DAYS, "Frequency at which timer task runs to do automatic statistics management for tables\n" +
1296+
"with table property 'statistics.auto.deletion'='true'. Statistics management include 2 configs. \n" +
1297+
"One is 'statistics.auto.deletion', and the other is 'statistics.retention.period'. \n" +
1298+
"When 'statistics.auto.deletion'='true' is set, statistics management will look for tables which their\n " +
1299+
"column statistics are over the retention period, and then delete the column stats. \n"),
1300+
STATISTICS_RETENTION_PERIOD("metastore.statistics.retention.period",
1301+
"metastore.statistics.retention.period", 365, TimeUnit.DAYS, "The retention period " +
1302+
"that we want to keep the stats for each table, which means if the stats are older than this period\n" +
1303+
"of time, the stats will be automatically deleted. \n"),
1304+
1305+
STATISTICS_AUTO_DELETION("statistics.auto.deletion", "statistics.auto.deletion", true,
1306+
"Whether table/partition column statistics will be auto deleted after retention period"),
1307+
12931308
METASTORE_METADATA_TRANSFORMER_CLASS("metastore.metadata.transformer.class", "metastore.metadata.transformer.class",
12941309
"org.apache.hadoop.hive.metastore.MetastoreDefaultTransformer",
12951310
"Fully qualified class name for the metastore metadata transformer class \n"

standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7334,6 +7334,10 @@ public boolean delete_column_statistics_req(DeleteColumnStatisticsRequest req) t
73347334
}
73357335
ret = rawStore.deletePartitionColumnStatistics(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tableName,
73367336
partNames, colNames, engine);
7337+
// alter table API alter partition API
7338+
// we don't want additional notifications
7339+
// double check if we can compute certain column stats, does this set the accurate on the table
7340+
// table properties, change it
73377341
if (ret) {
73387342
eventType = EventType.DELETE_PARTITION_COLUMN_STAT;
73397343
for (String colName : colNames == null ? table.getSd().getCols().stream().map(FieldSchema::getName)
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hive.metastore;
20+
21+
import java.util.ArrayList;
22+
import java.util.Arrays;
23+
import java.util.HashSet;
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.Set;
27+
import java.util.concurrent.CountDownLatch;
28+
import java.util.concurrent.ExecutorService;
29+
import java.util.concurrent.Executors;
30+
import java.util.concurrent.TimeUnit;
31+
import java.util.concurrent.locks.Lock;
32+
import java.util.concurrent.locks.ReentrantLock;
33+
34+
import org.apache.hadoop.conf.Configuration;
35+
import org.apache.hadoop.hive.common.TableName;
36+
import org.apache.hadoop.hive.metastore.api.Database;
37+
import org.apache.hadoop.hive.metastore.api.DeleteColumnStatisticsRequest;
38+
import org.apache.hadoop.hive.metastore.api.Partition;
39+
import org.apache.hadoop.hive.metastore.api.Table;
40+
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
41+
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
42+
import org.apache.hadoop.hive.metastore.conf.TimeValidator;
43+
import org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics;
44+
import org.apache.hadoop.hive.metastore.model.MTable;
45+
import org.apache.hadoop.hive.metastore.model.MTableColumnStatistics;
46+
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
47+
import org.slf4j.Logger;
48+
import org.slf4j.LoggerFactory;
49+
50+
import com.google.common.annotations.VisibleForTesting;
51+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
52+
53+
import javax.jdo.PersistenceManager;
54+
import javax.jdo.Query;
55+
56+
/**
57+
* Statistics management task is primarily responsible for auto deletion of table column stats based on a certain frequency
58+
*
59+
* If some table column statistics are older than the period time, they should be deleted automatically
60+
* Statistics Retention - If "partition.retention.period" table property is set with retention interval, when this
61+
* metastore task runs periodically, it will drop partitions with age (creation time) greater than retention period.
62+
* Dropping partitions after retention period will also delete the data in that partition.
63+
*
64+
*/
65+
public class StatisticsManagementTask implements MetastoreTaskThread {
66+
private static final Logger LOG = LoggerFactory.getLogger(StatisticsManagementTask.class);
67+
68+
// global
69+
public static final String STATISTICS_AUTO_DELETION = "statistics.auto.deletion";
70+
public static final String STATISTICS_RETENTION_PERIOD = "statistics.retention.period";
71+
72+
// The 2 configs for users to set in the conf
73+
// this is an optional table property, if this property does not exist for a table, then it is not excluded
74+
public static final String STATISTICS_AUTO_DELETION_EXCLUDE_TBLPROPERTY = "statistics.auto.deletion.exclude";
75+
76+
private static final Lock lock = new ReentrantLock();
77+
78+
// these are just for testing
79+
private static int completedAttempts;
80+
private static int skippedAttempts;
81+
82+
private Configuration conf;
83+
84+
@Override
85+
public long runFrequency(TimeUnit unit) {
86+
return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.STATISTICS_MANAGEMENT_TASK_FREQUENCY, unit);
87+
}
88+
89+
@Override
90+
public void setConf(Configuration configuration) {
91+
// we modify conf in setupConf(), so we make a copy
92+
conf = new Configuration(configuration);
93+
}
94+
95+
@Override
96+
public Configuration getConf() {
97+
return conf;
98+
}
99+
100+
// what needs to be included in this run() method:
101+
// get the "lastAnalyzed" information from TAB_COL_STATS and find all the tables need to be deleted
102+
// delete all column stats
103+
@Override
104+
public void run() {
105+
if (LOG.isDebugEnabled()) {
106+
LOG.debug("Auto statistics deletion started. Cleaning up table/partition column statistics over the retention period.");
107+
}
108+
long retentionMillis = MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars. STATISTICS_RETENTION_PERIOD, TimeUnit.MILLISECONDS);
109+
if (retentionMillis <= 0 || !MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.STATISTICS_AUTO_DELETION)) {
110+
LOG.info("Statistics auto deletion is set to off currently.");
111+
return;
112+
}
113+
if (lock.tryLock()) {
114+
skippedAttempts = 0;
115+
String qualifiedTableName = null;
116+
IMetaStoreClient msc = null;
117+
try {
118+
// Get retention period in conf in milliseconds; default is 365 days.
119+
long now = System.currentTimeMillis();
120+
long lastAnalyzedThreshold = (now - retentionMillis) / 1000;
121+
122+
// Get all databases from metastore
123+
List<String> databases = msc.getAllDatabases();
124+
RawStore ms = HMSHandler.getMSForConf(conf);
125+
PersistenceManager pm = ((ObjectStore) ms).getPersistenceManager();
126+
Query q = pm.newQuery("SELECT FROM org.apache.hadoop.hive.metastore.model.MTableColumnStatistics");
127+
q.setFilter("lastAnalyzed < " + lastAnalyzedThreshold);
128+
List<MTableColumnStatistics> results = (List<MTableColumnStatistics>) q.execute();
129+
130+
for (MTableColumnStatistics stat : results) {
131+
String dbName = stat.getTable().getDatabase().getName();
132+
String tblName = stat.getTable().getTableName();
133+
Map<String, String> tblParams = stat.getTable().getParameters();
134+
if (tblParams != null && tblParams.getOrDefault(STATISTICS_AUTO_DELETION_EXCLUDE_TBLPROPERTY, null) != null) {
135+
LOG.info("Skipping table {}.{} due to exclude property.", dbName, tblName);
136+
continue;
137+
}
138+
/**
139+
// if this table contains "lastAnalyzed" in table property, we process the auto stats deletion
140+
long lastAnalyzed = stat.getLastAnalyzed();
141+
// lastAnalyzed is in unit seconds, switch it to milliseconds
142+
lastAnalyzed *= 1000;
143+
**/
144+
145+
DeleteColumnStatisticsRequest request = new DeleteColumnStatisticsRequest(dbName, tblName);
146+
request.setEngine("hive");
147+
boolean isPartitioned = stat.getTable().getPartitionKeys() != null && !stat.getTable().getPartitionKeys().isEmpty();
148+
// Delete table-level column statistics
149+
if (!isPartitioned) {
150+
request.setTableLevel(true);
151+
} else {
152+
request.setTableLevel(false);
153+
}
154+
msc.deleteColumnStatistics(request);
155+
}
156+
} catch (Exception e) {
157+
LOG.error("Error during statistics auto deletion", e);
158+
}
159+
}
160+
}
161+
162+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hive.metastore;
20+
21+
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
22+
import org.apache.hadoop.fs.FileSystem;
23+
import org.apache.hadoop.fs.Path;
24+
import org.apache.hadoop.hive.metastore.api.Table;
25+
import org.jetbrains.annotations.NotNull;
26+
27+
import javax.rmi.CORBA.Util;
28+
import java.util.ArrayList;
29+
import java.util.Arrays;
30+
import java.util.Collections;
31+
import java.util.List;
32+
import java.util.concurrent.Callable;
33+
import java.util.concurrent.ExecutorService;
34+
import java.util.concurrent.Executors;
35+
import java.util.concurrent.Future;
36+
import java.util.concurrent.atomic.AtomicLong;
37+
38+
public class TestStatisticsManagement {
39+
40+
}

standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkOpenTxns;
7171
import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkPartitionManagement;
7272
import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkRenameTable;
73+
import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkStatisticsManagement;
7374
import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkTableCreate;
7475
import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkUpdatePartitionsStat;
7576
import static org.apache.hadoop.hive.metastore.tools.Util.getServerUri;
@@ -303,7 +304,9 @@ private void runNonAcidBenchmarks() {
303304
.add("openTxn",
304305
() -> benchmarkOpenTxns(bench, bData, 1))
305306
.add("PartitionManagementTask",
306-
() -> benchmarkPartitionManagement(bench, bData, 1));
307+
() -> benchmarkPartitionManagement(bench, bData, 1))
308+
.add("StatisticsManagementTask",
309+
() -> benchmarkStatisticsManagement(bench, bData, 1));
307310

308311
for (int howMany: instances) {
309312
suite.add("listTables" + '.' + howMany,
@@ -345,7 +348,9 @@ private void runNonAcidBenchmarks() {
345348
.add("openTxns" + '.' + howMany,
346349
() -> benchmarkOpenTxns(bench, bData, howMany))
347350
.add("PartitionManagementTask" + "." + howMany,
348-
() -> benchmarkPartitionManagement(bench, bData, howMany));
351+
() -> benchmarkPartitionManagement(bench, bData, howMany))
352+
.add("PartitionStatisticsTask" + "." + howMany,
353+
() -> benchmarkStatisticsManagement(bench, bData, howMany));
349354
}
350355

351356
List<String> toRun = suite.listMatching(matches, exclude);

standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.hadoop.fs.FileSystem;
2323
import org.apache.hadoop.fs.Path;
2424
import org.apache.hadoop.hive.metastore.PartitionManagementTask;
25+
import org.apache.hadoop.hive.metastore.StatisticsManagementTask;
2526
import org.apache.hadoop.hive.metastore.TableType;
2627
import org.apache.hadoop.hive.metastore.api.FieldSchema;
2728
import org.apache.hadoop.hive.metastore.api.Partition;
@@ -42,6 +43,7 @@
4243
import java.util.concurrent.ExecutorService;
4344
import java.util.concurrent.Executors;
4445
import java.util.concurrent.Future;
46+
import java.util.concurrent.TimeUnit;
4547
import java.util.concurrent.atomic.AtomicLong;
4648
import java.util.stream.Collectors;
4749
import java.util.stream.IntStream;
@@ -695,4 +697,68 @@ static DescriptiveStatistics benchmarkPartitionManagement(@NotNull MicroBenchmar
695697
}
696698
}
697699

700+
static DescriptiveStatistics benchmarkStatisticsManagement(@NotNull MicroBenchmark bench,
701+
@NotNull BenchData data,
702+
int tableCount) {
703+
704+
String dbName = data.dbName + "_" + tableCount;
705+
String tableNamePrefix = data.tableName;
706+
final HMSClient client = data.getClient();
707+
final StatisticsManagementTask statsTask = new StatisticsManagementTask();
708+
final FileSystem fs;
709+
try {
710+
fs = FileSystem.get(client.getHadoopConf());
711+
client.getHadoopConf().set("hive.metastore.uris", client.getServerURI().toString());
712+
client.getHadoopConf().set("metastore.statistics.management.database.pattern", dbName);
713+
statsTask.setConf(client.getHadoopConf());
714+
715+
client.createDatabase(dbName);
716+
for (int i = 0; i < tableCount; i++) {
717+
String tableName = tableNamePrefix + "_" + i;
718+
Util.TableBuilder tableBuilder = new Util.TableBuilder(dbName, tableName)
719+
.withType(TableType.MANAGED_TABLE)
720+
.withColumns(createSchema(Arrays.asList("col1:string", "col2:int")))
721+
.withPartitionKeys(createSchema(Collections.singletonList("part_col")))
722+
.withParameter("columnStatsAccurate", "true");
723+
724+
client.createTable(tableBuilder.build());
725+
addManyPartitionsNoException(client, dbName, tableName, null, Collections.singletonList("part_col"), 100);
726+
727+
// simulate the partitions of each table which its stats has an old "lastAnalyzed"
728+
List<Partition> partitions = client.listPartitions(dbName, tableName);
729+
for (Partition partition : partitions) {
730+
Map<String, String> params = partition.getParameters();
731+
// to manually change the "lastAnalyzed" to an old time, ex. 400 days
732+
params.put("lastAnalyzed", String.valueOf(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(400)));
733+
client.alterPartition(dbName, tableName, partition);
734+
}
735+
}
736+
} catch (Exception e) {
737+
throw new RuntimeException(e);
738+
}
739+
740+
Runnable preRun = () -> {
741+
System.out.println("Preparing for benchmark...");
742+
};
743+
744+
try {
745+
DescriptiveStatistics stats = bench.measure(preRun, statsTask, null);
746+
747+
// check if the stats are deleted
748+
for (int i = 0; i < tableCount; i++) {
749+
String tableName = tableNamePrefix + "_" + i;
750+
List<Partition> partitions = client.listPartitions(dbName, tableName);
751+
for (Partition partition : partitions) {
752+
Map<String, String> params = partition.getParameters();
753+
if (params.containsKey("lastAnalyzed")) {
754+
throw new AssertionError("Partition stats not deleted for table: " + tableName);
755+
}
756+
}
757+
}
758+
return stats;
759+
} catch (Exception e) {
760+
throw new RuntimeException(e);
761+
}
762+
}
763+
698764
}

0 commit comments

Comments
 (0)