Skip to content

Commit dd2e0c2

Browse files
committed
Unify MTMV metadata access via engine cache bridge
1 parent 8c58e2e commit dd2e0c2

File tree

9 files changed

+417
-187
lines changed

9 files changed

+417
-187
lines changed

fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java

Lines changed: 12 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.doris.datasource.iceberg.IcebergMvccSnapshot;
4444
import org.apache.doris.datasource.iceberg.IcebergSchemaCacheKey;
4545
import org.apache.doris.datasource.iceberg.IcebergUtils;
46+
import org.apache.doris.datasource.metacache.EngineMtmvSupport;
4647
import org.apache.doris.datasource.mvcc.EmptyMvccSnapshot;
4748
import org.apache.doris.datasource.mvcc.MvccSnapshot;
4849
import org.apache.doris.datasource.mvcc.MvccTable;
@@ -73,7 +74,6 @@
7374
import org.apache.doris.thrift.TTableDescriptor;
7475
import org.apache.doris.thrift.TTableType;
7576

76-
import com.google.common.collect.BiMap;
7777
import com.google.common.collect.Lists;
7878
import com.google.common.collect.Maps;
7979
import com.google.common.collect.Sets;
@@ -423,23 +423,12 @@ public SelectedPartitions initHudiSelectedPartitions(Optional<TableSnapshot> tab
423423

424424
@Override
425425
public Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
426-
return getNameToPartitionItems();
426+
makeSureInitialized();
427+
return EngineMtmvSupport.getPartitionItems(this, snapshot);
427428
}
428429

429430
public Map<String, PartitionItem> getNameToPartitionItems() {
430-
if (CollectionUtils.isEmpty(this.getPartitionColumns())) {
431-
return Collections.emptyMap();
432-
}
433-
HiveMetaStoreCache.HivePartitionValues hivePartitionValues = getHivePartitionValues(
434-
MvccUtil.getSnapshotFromContext(this));
435-
Map<Long, PartitionItem> idToPartitionItem = hivePartitionValues.getIdToPartitionItem();
436-
// transfer id to name
437-
BiMap<Long, String> idToName = hivePartitionValues.getPartitionNameToIdMap().inverse();
438-
Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMapWithExpectedSize(idToPartitionItem.size());
439-
for (Entry<Long, PartitionItem> entry : idToPartitionItem.entrySet()) {
440-
nameToPartitionItem.put(idToName.get(entry.getKey()), entry.getValue());
441-
}
442-
return nameToPartitionItem;
431+
return getNameToPartitionItems(MvccUtil.getSnapshotFromContext(this));
443432
}
444433

445434
public boolean isHiveTransactionalTable() {
@@ -964,42 +953,34 @@ public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> snapshot) {
964953
@Override
965954
public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot)
966955
throws AnalysisException {
967-
makeSureInitialized();
968-
return dlaTable.getAndCopyPartitionItems(snapshot);
956+
return EngineMtmvSupport.getAndCopyPartitionItems(this, snapshot);
969957
}
970958

971959
@Override
972960
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context,
973961
Optional<MvccSnapshot> snapshot) throws AnalysisException {
974-
makeSureInitialized();
975-
return dlaTable.getPartitionSnapshot(partitionName, context, snapshot);
962+
return EngineMtmvSupport.getPartitionSnapshot(this, partitionName, snapshot);
976963
}
977964

978965
@Override
979966
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<MvccSnapshot> snapshot)
980967
throws AnalysisException {
981-
makeSureInitialized();
982-
return dlaTable.getTableSnapshot(context, snapshot);
968+
return EngineMtmvSupport.getTableSnapshot(this, snapshot);
983969
}
984970

985971
@Override
986972
public MTMVSnapshotIf getTableSnapshot(Optional<MvccSnapshot> snapshot) throws AnalysisException {
987-
makeSureInitialized();
988-
return dlaTable.getTableSnapshot(snapshot);
973+
return EngineMtmvSupport.getTableSnapshot(this, snapshot);
989974
}
990975

991976
@Override
992977
public long getNewestUpdateVersionOrTime() {
993-
HiveMetaStoreCache.HivePartitionValues hivePartitionValues = getHivePartitionValues(
994-
MvccUtil.getSnapshotFromContext(this));
995-
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
996-
.getMetaStoreCache((HMSExternalCatalog) getCatalog());
997-
List<HivePartition> partitionList = cache.getAllPartitionsWithCache(this,
998-
Lists.newArrayList(hivePartitionValues.getPartitionValuesMap().values()));
999-
if (CollectionUtils.isEmpty(partitionList)) {
978+
try {
979+
return getTableSnapshot(MvccUtil.getSnapshotFromContext(this)).getSnapshotVersion();
980+
} catch (AnalysisException e) {
981+
LOG.warn("failed to get newest update version for table {}", getNameWithFullQualifiers(), e);
1000982
return 0;
1001983
}
1002-
return partitionList.stream().mapToLong(HivePartition::getLastModifiedTime).max().orElse(0);
1003984
}
1004985

1005986
@Override

fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveDlaTable.java

Lines changed: 6 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,14 @@
1818
package org.apache.doris.datasource.hive;
1919

2020
import org.apache.doris.catalog.Column;
21-
import org.apache.doris.catalog.Env;
2221
import org.apache.doris.catalog.PartitionItem;
2322
import org.apache.doris.catalog.PartitionType;
2423
import org.apache.doris.common.AnalysisException;
2524
import org.apache.doris.datasource.SchemaCacheValue;
25+
import org.apache.doris.datasource.metacache.EngineMtmvSupport;
2626
import org.apache.doris.datasource.mvcc.MvccSnapshot;
27-
import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot;
2827
import org.apache.doris.mtmv.MTMVRefreshContext;
2928
import org.apache.doris.mtmv.MTMVSnapshotIf;
30-
import org.apache.doris.mtmv.MTMVTimestampSnapshot;
31-
32-
import com.google.common.collect.Lists;
33-
import org.apache.commons.collections4.CollectionUtils;
3429

3530
import java.util.Collections;
3631
import java.util.List;
@@ -64,20 +59,15 @@ public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
6459
}
6560

6661
@Override
67-
public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
68-
return hmsTable.getNameToPartitionItems();
62+
public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot)
63+
throws AnalysisException {
64+
return EngineMtmvSupport.getAndCopyPartitionItems(hmsTable, snapshot);
6965
}
7066

7167
@Override
7268
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context,
7369
Optional<MvccSnapshot> snapshot) throws AnalysisException {
74-
HiveMetaStoreCache.HivePartitionValues hivePartitionValues = hmsTable.getHivePartitionValues(snapshot);
75-
Long partitionId = getPartitionIdByNameOrAnalysisException(partitionName, hivePartitionValues);
76-
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
77-
.getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog());
78-
HivePartition hivePartition = getHivePartitionByIdOrAnalysisException(partitionId,
79-
hivePartitionValues, cache);
80-
return new MTMVTimestampSnapshot(hivePartition.getLastModifiedTime());
70+
return EngineMtmvSupport.getPartitionSnapshot(hmsTable, partitionName, snapshot);
8171
}
8272

8373
@Override
@@ -88,53 +78,7 @@ public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<Mvcc
8878

8979
@Override
9080
public MTMVSnapshotIf getTableSnapshot(Optional<MvccSnapshot> snapshot) throws AnalysisException {
91-
if (hmsTable.getPartitionType(snapshot) == PartitionType.UNPARTITIONED) {
92-
return new MTMVMaxTimestampSnapshot(hmsTable.getName(), hmsTable.getLastDdlTime());
93-
}
94-
HivePartition maxPartition = null;
95-
long maxVersionTime = 0L;
96-
long visibleVersionTime;
97-
HiveMetaStoreCache.HivePartitionValues hivePartitionValues = hmsTable.getHivePartitionValues(snapshot);
98-
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
99-
.getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog());
100-
List<HivePartition> partitionList = cache.getAllPartitionsWithCache(hmsTable,
101-
Lists.newArrayList(hivePartitionValues.getPartitionValuesMap().values()));
102-
if (CollectionUtils.isEmpty(partitionList)) {
103-
return new MTMVMaxTimestampSnapshot(hmsTable.getName(), 0L);
104-
}
105-
for (HivePartition hivePartition : partitionList) {
106-
visibleVersionTime = hivePartition.getLastModifiedTime();
107-
if (visibleVersionTime > maxVersionTime) {
108-
maxVersionTime = visibleVersionTime;
109-
maxPartition = hivePartition;
110-
}
111-
}
112-
return new MTMVMaxTimestampSnapshot(maxPartition.getPartitionName(
113-
hmsTable.getPartitionColumns()), maxVersionTime);
114-
}
115-
116-
private Long getPartitionIdByNameOrAnalysisException(String partitionName,
117-
HiveMetaStoreCache.HivePartitionValues hivePartitionValues)
118-
throws AnalysisException {
119-
Long partitionId = hivePartitionValues.getPartitionNameToIdMap().get(partitionName);
120-
if (partitionId == null) {
121-
throw new AnalysisException("can not find partition: " + partitionName);
122-
}
123-
return partitionId;
124-
}
125-
126-
private HivePartition getHivePartitionByIdOrAnalysisException(Long partitionId,
127-
HiveMetaStoreCache.HivePartitionValues hivePartitionValues,
128-
HiveMetaStoreCache cache) throws AnalysisException {
129-
List<String> partitionValues = hivePartitionValues.getPartitionValuesMap().get(partitionId);
130-
if (CollectionUtils.isEmpty(partitionValues)) {
131-
throw new AnalysisException("can not find partitionValues: " + partitionId);
132-
}
133-
HivePartition partition = cache.getHivePartition(hmsTable, partitionValues);
134-
if (partition == null) {
135-
throw new AnalysisException("can not find partition: " + partitionId);
136-
}
137-
return partition;
81+
return EngineMtmvSupport.getTableSnapshot(hmsTable, snapshot);
13882
}
13983

14084
@Override

fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveEngineCache.java

Lines changed: 75 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.doris.datasource.hive;
1919

20+
import org.apache.doris.catalog.Column;
21+
import org.apache.doris.catalog.PartitionType;
2022
import org.apache.doris.common.Config;
2123
import org.apache.doris.datasource.ExternalCatalog;
2224
import org.apache.doris.datasource.ExternalTable;
@@ -30,7 +32,9 @@
3032

3133
import com.github.benmanes.caffeine.cache.stats.CacheStats;
3234
import com.google.common.collect.ImmutableList;
35+
import org.apache.commons.collections4.CollectionUtils;
3336

37+
import java.util.ArrayList;
3438
import java.util.List;
3539
import java.util.Objects;
3640
import java.util.Optional;
@@ -85,12 +89,38 @@ public void invalidateTable(ExternalTable table) {
8589

8690
@Override
8791
public EnginePartitionInfo getPartitionInfo(ExternalTable table, Optional<MvccSnapshot> snapshot) {
88-
throw new UnsupportedOperationException("Hive does not support getPartitionInfo via engine cache");
92+
HMSExternalTable hmsTable = asHiveTable(table);
93+
return new HivePartition(metaStoreCache.getPartitionValues(
94+
hmsTable, hmsTable.getPartitionColumnTypes(snapshot)));
8995
}
9096

9197
@Override
9298
public EngineSnapshot getSnapshot(ExternalTable table, Optional<MvccSnapshot> snapshot) {
93-
throw new UnsupportedOperationException("Hive does not support getSnapshot via engine cache");
99+
HMSExternalTable hmsTable = asHiveTable(table);
100+
if (hmsTable.getPartitionType(snapshot) == PartitionType.UNPARTITIONED) {
101+
return new HiveSnapshotMeta(hmsTable.getName(), hmsTable.getLastDdlTime());
102+
}
103+
HiveMetaStoreCache.HivePartitionValues partitionValues = metaStoreCache.getPartitionValues(
104+
hmsTable, hmsTable.getPartitionColumnTypes(snapshot));
105+
List<HivePartition> partitions = metaStoreCache.getAllPartitionsWithCache(
106+
hmsTable, new ArrayList<>(partitionValues.getPartitionValuesMap().values()));
107+
if (CollectionUtils.isEmpty(partitions)) {
108+
return new HiveSnapshotMeta(hmsTable.getName(), 0L);
109+
}
110+
HivePartition maxPartition = null;
111+
long maxTimestamp = 0L;
112+
List<Column> partitionColumns = hmsTable.getPartitionColumns(snapshot);
113+
for (HivePartition hivePartition : partitions) {
114+
long currentTimestamp = hivePartition.getLastModifiedTime();
115+
if (currentTimestamp > maxTimestamp) {
116+
maxTimestamp = currentTimestamp;
117+
maxPartition = hivePartition;
118+
}
119+
}
120+
if (maxPartition == null) {
121+
return new HiveSnapshotMeta(hmsTable.getName(), 0L);
122+
}
123+
return new HiveSnapshotMeta(maxPartition.getPartitionName(partitionColumns), maxTimestamp);
94124
}
95125

96126
@Override
@@ -102,6 +132,49 @@ public HiveMetaStoreCache getMetaStoreCache() {
102132
return metaStoreCache;
103133
}
104134

135+
private HMSExternalTable asHiveTable(ExternalTable table) {
136+
if (!(table instanceof HMSExternalTable)) {
137+
throw new IllegalArgumentException("Expected HMSExternalTable for hive engine cache, but got "
138+
+ table.getClass().getSimpleName());
139+
}
140+
HMSExternalTable hmsTable = (HMSExternalTable) table;
141+
if (hmsTable.getDlaType() != HMSExternalTable.DLAType.HIVE) {
142+
throw new IllegalArgumentException("Expected HIVE DLA type for hive engine cache, but got "
143+
+ hmsTable.getDlaType() + ", table=" + hmsTable.getNameWithFullQualifiers());
144+
}
145+
return hmsTable;
146+
}
147+
148+
public static final class HivePartition implements EnginePartitionInfo {
149+
private final HiveMetaStoreCache.HivePartitionValues partitionValues;
150+
151+
public HivePartition(HiveMetaStoreCache.HivePartitionValues partitionValues) {
152+
this.partitionValues = partitionValues;
153+
}
154+
155+
public HiveMetaStoreCache.HivePartitionValues getPartitionValues() {
156+
return partitionValues;
157+
}
158+
}
159+
160+
public static final class HiveSnapshotMeta implements EngineSnapshot {
161+
private final String partitionName;
162+
private final long timestamp;
163+
164+
public HiveSnapshotMeta(String partitionName, long timestamp) {
165+
this.partitionName = partitionName;
166+
this.timestamp = timestamp;
167+
}
168+
169+
public String getPartitionName() {
170+
return partitionName;
171+
}
172+
173+
public long getTimestamp() {
174+
return timestamp;
175+
}
176+
}
177+
105178
private static class HivePartitionValuesModule implements CacheModule {
106179
private final HiveMetaStoreCache metaStoreCache;
107180
private final CacheSpec cacheSpec;

fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HudiDlaTable.java

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,13 @@
2525
import org.apache.doris.datasource.CacheException;
2626
import org.apache.doris.datasource.ExternalSchemaCache;
2727
import org.apache.doris.datasource.SchemaCacheValue;
28-
import org.apache.doris.datasource.TablePartitionValues;
2928
import org.apache.doris.datasource.hudi.HudiMvccSnapshot;
3029
import org.apache.doris.datasource.hudi.HudiSchemaCacheKey;
3130
import org.apache.doris.datasource.hudi.HudiUtils;
31+
import org.apache.doris.datasource.metacache.EngineMtmvSupport;
3232
import org.apache.doris.datasource.mvcc.MvccSnapshot;
3333
import org.apache.doris.mtmv.MTMVRefreshContext;
3434
import org.apache.doris.mtmv.MTMVSnapshotIf;
35-
import org.apache.doris.mtmv.MTMVTimestampSnapshot;
36-
37-
import com.google.common.collect.Maps;
3835

3936
import java.util.List;
4037
import java.util.Map;
@@ -65,24 +62,15 @@ public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
6562
}
6663

6764
@Override
68-
public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
69-
TablePartitionValues tablePartitionValues = getOrFetchHudiSnapshotCacheValue(snapshot);
70-
Map<Long, PartitionItem> idToPartitionItem = tablePartitionValues.getIdToPartitionItem();
71-
Map<Long, String> partitionIdToNameMap = tablePartitionValues.getPartitionIdToNameMap();
72-
Map<String, PartitionItem> copiedPartitionItems = Maps.newHashMap();
73-
for (Long key : partitionIdToNameMap.keySet()) {
74-
copiedPartitionItems.put(partitionIdToNameMap.get(key), idToPartitionItem.get(key));
75-
}
76-
return copiedPartitionItems;
65+
public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot)
66+
throws AnalysisException {
67+
return EngineMtmvSupport.getAndCopyPartitionItems(hmsTable, snapshot);
7768
}
7869

7970
@Override
8071
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context,
8172
Optional<MvccSnapshot> snapshot) throws AnalysisException {
82-
// Map<String, Long> partitionNameToLastModifiedMap = getOrFetchHudiSnapshotCacheValue(
83-
// snapshot).getPartitionNameToLastModifiedMap();
84-
// return new MTMVTimestampSnapshot(partitionNameToLastModifiedMap.get(partitionName));
85-
return new MTMVTimestampSnapshot(0L);
73+
return EngineMtmvSupport.getPartitionSnapshot(hmsTable, partitionName, snapshot);
8674
}
8775

8876
@Override
@@ -93,23 +81,14 @@ public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<Mvcc
9381

9482
@Override
9583
public MTMVSnapshotIf getTableSnapshot(Optional<MvccSnapshot> snapshot) throws AnalysisException {
96-
// return new MTMVTimestampSnapshot(getOrFetchHudiSnapshotCacheValue(snapshot).getLastUpdateTimestamp());
97-
return new MTMVTimestampSnapshot(0L);
84+
return EngineMtmvSupport.getTableSnapshot(hmsTable, snapshot);
9885
}
9986

10087
@Override
10188
public boolean isPartitionColumnAllowNull() {
10289
return true;
10390
}
10491

105-
private TablePartitionValues getOrFetchHudiSnapshotCacheValue(Optional<MvccSnapshot> snapshot) {
106-
if (snapshot.isPresent()) {
107-
return ((HudiMvccSnapshot) snapshot.get()).getTablePartitionValues();
108-
} else {
109-
return HudiUtils.getPartitionValues(Optional.empty(), hmsTable);
110-
}
111-
}
112-
11392
public HMSSchemaCacheValue getHudiSchemaCacheValue(Optional<MvccSnapshot> snapshot) {
11493
long timestamp = 0L;
11594
if (snapshot.isPresent()) {

0 commit comments

Comments
 (0)