Skip to content

Commit 6f311cc

Browse files
branch-4.0: [enhance](iceberg) Refactor Iceberg metadata cache structure and add table cache test cases #59716 (#60341)
Cherry-picked from #59716 Co-authored-by: Socrates <[email protected]>
1 parent fc74854 commit 6f311cc

File tree

8 files changed

+626
-197
lines changed

8 files changed

+626
-197
lines changed

fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1150,7 +1150,7 @@ public MvccSnapshot loadSnapshot(Optional<TableSnapshot> tableSnapshot, Optional
11501150
return HudiUtils.getHudiMvccSnapshot(tableSnapshot, this);
11511151
} else if (getDlaType() == DLAType.ICEBERG) {
11521152
return new IcebergMvccSnapshot(
1153-
IcebergUtils.getIcebergSnapshotCacheValue(tableSnapshot, this, scanParams));
1153+
IcebergUtils.getSnapshotCacheValue(tableSnapshot, this, scanParams));
11541154
} else {
11551155
return new EmptyMvccSnapshot();
11561156
}

fe/fe-core/src/main/java/org/apache/doris/datasource/hive/IcebergDlaTable.java

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.doris.catalog.PartitionItem;
2222
import org.apache.doris.catalog.PartitionType;
2323
import org.apache.doris.common.AnalysisException;
24-
import org.apache.doris.datasource.iceberg.IcebergSchemaCacheValue;
2524
import org.apache.doris.datasource.iceberg.IcebergSnapshotCacheValue;
2625
import org.apache.doris.datasource.iceberg.IcebergUtils;
2726
import org.apache.doris.datasource.mvcc.MvccSnapshot;
@@ -52,9 +51,7 @@ public IcebergDlaTable(HMSExternalTable table) {
5251

5352
@Override
5453
public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
55-
return Maps.newHashMap(
56-
IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, hmsTable)
57-
.getPartitionInfo().getNameToPartitionItem());
54+
return Maps.newHashMap(IcebergUtils.getIcebergPartitionItems(snapshot, hmsTable));
5855
}
5956

6057
@Override
@@ -69,19 +66,13 @@ public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> snapshot) {
6966

7067
@Override
7168
public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
72-
IcebergSnapshotCacheValue snapshotValue =
73-
IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, hmsTable);
74-
IcebergSchemaCacheValue schemaValue = IcebergUtils.getSchemaCacheValue(
75-
hmsTable,
76-
snapshotValue.getSnapshot().getSchemaId());
77-
return schemaValue.getPartitionColumns();
69+
return IcebergUtils.getIcebergPartitionColumns(snapshot, hmsTable);
7870
}
7971

8072
@Override
8173
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context,
8274
Optional<MvccSnapshot> snapshot) throws AnalysisException {
83-
IcebergSnapshotCacheValue snapshotValue =
84-
IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, hmsTable);
75+
IcebergSnapshotCacheValue snapshotValue = IcebergUtils.getSnapshotCacheValue(snapshot, hmsTable);
8576
long latestSnapshotId = snapshotValue.getPartitionInfo().getLatestSnapshotId(partitionName);
8677
// If partition snapshot ID is unavailable (<= 0), fallback to table snapshot ID
8778
// This can happen when last_updated_snapshot_id is null in Iceberg metadata
@@ -102,16 +93,14 @@ public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshCont
10293
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<MvccSnapshot> snapshot)
10394
throws AnalysisException {
10495
hmsTable.makeSureInitialized();
105-
IcebergSnapshotCacheValue snapshotValue =
106-
IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, hmsTable);
96+
IcebergSnapshotCacheValue snapshotValue = IcebergUtils.getSnapshotCacheValue(snapshot, hmsTable);
10797
return new MTMVSnapshotIdSnapshot(snapshotValue.getSnapshot().getSnapshotId());
10898
}
10999

110100
@Override
111101
public MTMVSnapshotIf getTableSnapshot(Optional<MvccSnapshot> snapshot) throws AnalysisException {
112102
hmsTable.makeSureInitialized();
113-
IcebergSnapshotCacheValue snapshotValue =
114-
IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, hmsTable);
103+
IcebergSnapshotCacheValue snapshotValue = IcebergUtils.getSnapshotCacheValue(snapshot, hmsTable);
115104
return new MTMVSnapshotIdSnapshot(snapshotValue.getSnapshot().getSnapshotId());
116105
}
117106

fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog {
5353
public static final String ICEBERG_S3_TABLES = "s3tables";
5454
public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name";
5555
public static final String ICEBERG_TABLE_META_CACHE_TTL_SECOND = "iceberg.table.meta.cache.ttl-second";
56-
public static final String ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND = "iceberg.snapshot.meta.cache.ttl-second";
5756
public static final String ICEBERG_MANIFEST_CACHE_ENABLE = "iceberg.manifest.cache.enable";
5857
public static final String ICEBERG_MANIFEST_CACHE_CAPACITY_MB = "iceberg.manifest.cache.capacity-mb";
5958
public static final String ICEBERG_MANIFEST_CACHE_TTL_SECOND = "iceberg.manifest.cache.ttl-second";
@@ -96,15 +95,6 @@ public void checkProperties() throws DdlException {
9695
+ tableMetaCacheTtlSecond);
9796
}
9897

99-
// check iceberg.snapshot.meta.cache.ttl-second parameter
100-
String partitionCacheTtlSecond = catalogProperty.getOrDefault(ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND, null);
101-
if (Objects.nonNull(partitionCacheTtlSecond) && NumberUtils.toInt(partitionCacheTtlSecond, CACHE_NO_TTL)
102-
< CACHE_TTL_DISABLE_CACHE) {
103-
throw new DdlException(
104-
"The parameter " + ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND + " is wrong, value is "
105-
+ partitionCacheTtlSecond);
106-
}
107-
10898
String manifestCacheEnable = catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_ENABLE, null);
10999
if (Objects.nonNull(manifestCacheEnable)
110100
&& !(manifestCacheEnable.equalsIgnoreCase("true") || manifestCacheEnable.equalsIgnoreCase("false"))) {
@@ -134,8 +124,7 @@ public void checkProperties() throws DdlException {
134124
public void notifyPropertiesUpdated(Map<String, String> updatedProps) {
135125
super.notifyPropertiesUpdated(updatedProps);
136126
String tableMetaCacheTtl = updatedProps.getOrDefault(ICEBERG_TABLE_META_CACHE_TTL_SECOND, null);
137-
String snapshotMetaCacheTtl = updatedProps.getOrDefault(ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND, null);
138-
if (Objects.nonNull(tableMetaCacheTtl) || Objects.nonNull(snapshotMetaCacheTtl)) {
127+
if (Objects.nonNull(tableMetaCacheTtl)) {
139128
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache(this).init();
140129
}
141130
String manifestCacheEnable = updatedProps.getOrDefault(ICEBERG_MANIFEST_CACHE_ENABLE, null);

fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -135,15 +135,12 @@ public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
135135

136136
@Override
137137
public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
138-
return Maps.newHashMap(
139-
IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, this)
140-
.getPartitionInfo().getNameToPartitionItem());
138+
return Maps.newHashMap(IcebergUtils.getIcebergPartitionItems(snapshot, this));
141139
}
142140

143141
@Override
144142
public Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
145-
return IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, this)
146-
.getPartitionInfo().getNameToPartitionItem();
143+
return IcebergUtils.getIcebergPartitionItems(snapshot, this);
147144
}
148145

149146
@Override
@@ -158,18 +155,13 @@ public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> snapshot) thro
158155

159156
@Override
160157
public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
161-
IcebergSnapshotCacheValue snapshotValue =
162-
IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, this);
163-
IcebergSchemaCacheValue schemaValue = IcebergUtils.getSchemaCacheValue(
164-
this, snapshotValue.getSnapshot().getSchemaId());
165-
return schemaValue.getPartitionColumns();
158+
return IcebergUtils.getIcebergPartitionColumns(snapshot, this);
166159
}
167160

168161
@Override
169162
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context,
170163
Optional<MvccSnapshot> snapshot) throws AnalysisException {
171-
IcebergSnapshotCacheValue snapshotValue =
172-
IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, this);
164+
IcebergSnapshotCacheValue snapshotValue = IcebergUtils.getSnapshotCacheValue(snapshot, this);
173165
long latestSnapshotId = snapshotValue.getPartitionInfo().getLatestSnapshotId(partitionName);
174166
// If partition snapshot ID is unavailable (<= 0), fallback to table snapshot ID
175167
// This can happen when last_updated_snapshot_id is null in Iceberg metadata
@@ -195,13 +187,13 @@ public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<Mvcc
195187
@Override
196188
public MTMVSnapshotIf getTableSnapshot(Optional<MvccSnapshot> snapshot) throws AnalysisException {
197189
makeSureInitialized();
198-
IcebergSnapshotCacheValue snapshotValue = IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, this);
190+
IcebergSnapshotCacheValue snapshotValue = IcebergUtils.getSnapshotCacheValue(snapshot, this);
199191
return new MTMVSnapshotIdSnapshot(snapshotValue.getSnapshot().getSnapshotId());
200192
}
201193

202194
@Override
203195
public long getNewestUpdateVersionOrTime() {
204-
return IcebergUtils.getIcebergSnapshotCacheValue(Optional.empty(), this, Optional.empty())
196+
return IcebergUtils.getLatestSnapshotCacheValue(this)
205197
.getPartitionInfo().getNameToIcebergPartition().values().stream()
206198
.mapToLong(IcebergPartition::getLastUpdateTime).max().orElse(0);
207199
}
@@ -256,7 +248,7 @@ public MvccSnapshot loadSnapshot(Optional<TableSnapshot> tableSnapshot, Optional
256248
if (isView()) {
257249
return new EmptyMvccSnapshot();
258250
} else {
259-
return new IcebergMvccSnapshot(IcebergUtils.getIcebergSnapshotCacheValue(
251+
return new IcebergMvccSnapshot(IcebergUtils.getSnapshotCacheValue(
260252
tableSnapshot, this, scanParams));
261253
}
262254
}

0 commit comments

Comments
 (0)