Skip to content

Commit a0ecbae

Browse files
zddrseawinde
andauthored
[feature](mtmv)MTMV refresh support multi pct tables (#56958)
### What problem does this PR solve? Currently, a materialized view can only have one PCT table. If the reference table has multiple partitioned tables and changes occur in non-PCT tables, even if only some partitions are modified, the materialized view must be fully refreshed, which is costly. Therefore, support for multiple PCT tables is needed, but with the following restrictions: - Only inner joins and unions (including UNION ALL) are supported. - In the case of unions, all base tables must be PCT tables; otherwise, the derivation fails. - The partition granularity across multiple PCT tables must align. For example: The following scenario is allowed: t1 has 2 partitions: [2020-01-01, 2020-01-02), [2020-01-02, 2020-01-03) t2 has 2 partitions: [2020-01-02, 2020-01-03), [2020-01-03, 2020-01-04) The following scenario is not allowed: t1 has 2 partitions: [2020-01-01, 2020-01-03), [2020-01-03, 2020-01-05) t2 has 2 partitions: [2020-01-01, 2020-01-02), [2020-01-02, 2020-01-03) However, if the materialized view uses monthly partition roll-up, the above scenario is allowed, because the materialized view only needs to generate one partition: [2020-01-01, 2020-02-01). --------- Co-authored-by: seawinde <daydayup005@yeah.net> Co-authored-by: seawinde <wusi@selectdb.com>
1 parent 447d535 commit a0ecbae

File tree

90 files changed

+6550
-999
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

90 files changed

+6550
-999
lines changed

fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java

Lines changed: 36 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@
2121
import org.apache.doris.catalog.OlapTableFactory.MTMVParams;
2222
import org.apache.doris.common.AnalysisException;
2323
import org.apache.doris.common.Config;
24-
import org.apache.doris.common.Pair;
2524
import org.apache.doris.common.util.PropertyAnalyzer;
2625
import org.apache.doris.datasource.CatalogMgr;
2726
import org.apache.doris.info.TableNameInfo;
2827
import org.apache.doris.job.common.TaskStatus;
2928
import org.apache.doris.job.extensions.mtmv.MTMVTask;
29+
import org.apache.doris.mtmv.BaseTableInfo;
3030
import org.apache.doris.mtmv.EnvInfo;
3131
import org.apache.doris.mtmv.MTMVCache;
3232
import org.apache.doris.mtmv.MTMVJobInfo;
@@ -40,17 +40,21 @@
4040
import org.apache.doris.mtmv.MTMVRefreshInfo;
4141
import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot;
4242
import org.apache.doris.mtmv.MTMVRefreshSnapshot;
43+
import org.apache.doris.mtmv.MTMVRelatedTableIf;
4344
import org.apache.doris.mtmv.MTMVRelation;
45+
import org.apache.doris.mtmv.MTMVSnapshotIf;
4446
import org.apache.doris.mtmv.MTMVStatus;
4547
import org.apache.doris.qe.ConnectContext;
4648

4749
import com.google.common.collect.Maps;
4850
import com.google.common.collect.Sets;
4951
import com.google.gson.annotations.SerializedName;
52+
import org.apache.commons.collections.MapUtils;
5053
import org.apache.commons.lang3.StringUtils;
5154
import org.apache.logging.log4j.LogManager;
5255
import org.apache.logging.log4j.Logger;
5356

57+
import java.io.IOException;
5458
import java.util.Map;
5559
import java.util.Map.Entry;
5660
import java.util.Optional;
@@ -242,6 +246,7 @@ public boolean addTaskResult(MTMVTask task, MTMVRelation relation,
242246
this.status.setRefreshState(MTMVRefreshState.FAIL);
243247
}
244248
this.jobInfo.addHistoryTask(task);
249+
compatiblePctSnapshot(partitionSnapshots);
245250
this.refreshSnapshot.updateSnapshots(partitionSnapshots, getPartitionNames());
246251
Env.getCurrentEnv().getMtmvService()
247252
.refreshComplete(this, relation, task);
@@ -397,64 +402,26 @@ public Map<String, PartitionKeyDesc> generateMvPartitionDescs() {
397402
return result;
398403
}
399404

400-
/**
401-
* Calculate the partition and associated partition mapping relationship of the MTMV
402-
* It is the result of real-time comparison calculation, so there may be some costs,
403-
* so it should be called with caution.
404-
* The reason for not directly calling `calculatePartitionMappings` and
405-
* generating a reverse index is to directly generate two maps here,
406-
* without the need to traverse them again
407-
*
408-
* @return mvPartitionName ==> relationPartitionNames and relationPartitionName ==> mvPartitionName
409-
* @throws AnalysisException
410-
*/
411-
public Pair<Map<String, Set<String>>, Map<String, String>> calculateDoublyPartitionMappings()
412-
throws AnalysisException {
413-
if (mvPartitionInfo.getPartitionType() == MTMVPartitionType.SELF_MANAGE) {
414-
return Pair.of(Maps.newHashMap(), Maps.newHashMap());
415-
}
416-
long start = System.currentTimeMillis();
417-
Map<String, Set<String>> mvToBase = Maps.newHashMap();
418-
Map<String, String> baseToMv = Maps.newHashMap();
419-
Map<PartitionKeyDesc, Set<String>> relatedPartitionDescs = MTMVPartitionUtil
420-
.generateRelatedPartitionDescs(mvPartitionInfo, mvProperties);
421-
Map<String, PartitionItem> mvPartitionItems = getAndCopyPartitionItems();
422-
for (Entry<String, PartitionItem> entry : mvPartitionItems.entrySet()) {
423-
Set<String> basePartitionNames = relatedPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(),
424-
Sets.newHashSet());
425-
String mvPartitionName = entry.getKey();
426-
mvToBase.put(mvPartitionName, basePartitionNames);
427-
for (String basePartitionName : basePartitionNames) {
428-
baseToMv.put(basePartitionName, mvPartitionName);
429-
}
430-
}
431-
if (LOG.isDebugEnabled()) {
432-
LOG.debug("calculateDoublyPartitionMappings use [{}] mills, mvName is [{}]",
433-
System.currentTimeMillis() - start, name);
434-
}
435-
return Pair.of(mvToBase, baseToMv);
436-
}
437-
438405
/**
439406
* Calculate the partition and associated partition mapping relationship of the MTMV
440407
* It is the result of real-time comparison calculation, so there may be some costs,
441408
* so it should be called with caution
442409
*
443-
* @return mvPartitionName ==> relationPartitionNames
410+
* @return mvPartitionName ==> pctTable ==> pctPartitionName
444411
* @throws AnalysisException
445412
*/
446-
public Map<String, Set<String>> calculatePartitionMappings() throws AnalysisException {
413+
public Map<String, Map<MTMVRelatedTableIf, Set<String>>> calculatePartitionMappings() throws AnalysisException {
447414
if (mvPartitionInfo.getPartitionType() == MTMVPartitionType.SELF_MANAGE) {
448415
return Maps.newHashMap();
449416
}
450417
long start = System.currentTimeMillis();
451-
Map<String, Set<String>> res = Maps.newHashMap();
452-
Map<PartitionKeyDesc, Set<String>> relatedPartitionDescs = MTMVPartitionUtil
453-
.generateRelatedPartitionDescs(mvPartitionInfo, mvProperties);
418+
Map<String, Map<MTMVRelatedTableIf, Set<String>>> res = Maps.newHashMap();
419+
Map<PartitionKeyDesc, Map<MTMVRelatedTableIf, Set<String>>> pctPartitionDescs = MTMVPartitionUtil
420+
.generateRelatedPartitionDescs(mvPartitionInfo, mvProperties, getPartitionColumns());
454421
Map<String, PartitionItem> mvPartitionItems = getAndCopyPartitionItems();
455422
for (Entry<String, PartitionItem> entry : mvPartitionItems.entrySet()) {
456423
res.put(entry.getKey(),
457-
relatedPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(), Sets.newHashSet()));
424+
pctPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(), Maps.newHashMap()));
458425
}
459426
if (LOG.isDebugEnabled()) {
460427
LOG.debug("calculatePartitionMappings use [{}] mills, mvName is [{}]",
@@ -580,4 +547,28 @@ private void compatibleInternal(CatalogMgr catalogMgr) throws Exception {
580547
refreshSnapshot.compatible(this);
581548
}
582549
}
550+
551+
@Override
552+
public void gsonPostProcess() throws IOException {
553+
super.gsonPostProcess();
554+
Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots = refreshSnapshot.getPartitionSnapshots();
555+
compatiblePctSnapshot(partitionSnapshots);
556+
}
557+
558+
private void compatiblePctSnapshot(Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots) {
559+
BaseTableInfo relatedTableInfo = mvPartitionInfo.getRelatedTableInfo();
560+
if (relatedTableInfo == null) {
561+
return;
562+
}
563+
if (MapUtils.isEmpty(partitionSnapshots)) {
564+
return;
565+
}
566+
for (MTMVRefreshPartitionSnapshot partitionSnapshot : partitionSnapshots.values()) {
567+
Map<String, MTMVSnapshotIf> partitions = partitionSnapshot.getPartitions();
568+
Map<BaseTableInfo, Map<String, MTMVSnapshotIf>> pcts = partitionSnapshot.getPcts();
569+
if (!MapUtils.isEmpty(partitions) && MapUtils.isEmpty(pcts)) {
570+
pcts.put(relatedTableInfo, partitions);
571+
}
572+
}
573+
}
583574
}

fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3430,7 +3430,7 @@ public List<Column> getPartitionColumns() {
34303430
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context,
34313431
Optional<MvccSnapshot> snapshot)
34323432
throws AnalysisException {
3433-
Map<String, Long> partitionVersions = context.getBaseVersions().getPartitionVersions();
3433+
Map<String, Long> partitionVersions = context.getBaseVersions().getPartitionVersions(this);
34343434
long partitionId = getPartitionOrAnalysisException(partitionName).getId();
34353435
long visibleVersion = partitionVersions.containsKey(partitionName) ? partitionVersions.get(partitionName)
34363436
: getPartitionOrAnalysisException(partitionName).getVisibleVersion();

fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.doris.job.exception.JobException;
4545
import org.apache.doris.job.task.AbstractTask;
4646
import org.apache.doris.metric.MetricRepo;
47+
import org.apache.doris.mtmv.BaseColInfo;
4748
import org.apache.doris.mtmv.BaseTableInfo;
4849
import org.apache.doris.mtmv.MTMVBaseTableIf;
4950
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
@@ -209,12 +210,14 @@ public void run() throws JobException {
209210
MTMVPlanUtil.ensureMTMVQueryUsable(mtmv, ctx);
210211
}
211212
if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) {
212-
MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable();
213-
if (!relatedTable.isValidRelatedTable()) {
214-
throw new JobException("MTMV " + mtmv.getName() + "'s related table " + relatedTable.getName()
215-
+ " is not a valid related table anymore, stop refreshing."
216-
+ " e.g. Table has multiple partition columns"
217-
+ " or including not supported transform functions.");
213+
Set<MTMVRelatedTableIf> pctTables = mtmv.getMvPartitionInfo().getPctTables();
214+
for (MTMVRelatedTableIf pctTable : pctTables) {
215+
if (!pctTable.isValidRelatedTable()) {
216+
throw new JobException("MTMV " + mtmv.getName() + "'s pct table " + pctTable.getName()
217+
+ " is not a valid pct table anymore, stop refreshing."
218+
+ " e.g. Table has multiple partition columns"
219+
+ " or including not supported transform functions.");
220+
}
218221
}
219222
syncPartitions = MTMVPartitionUtil.alignMvPartition(mtmv);
220223
}
@@ -578,8 +581,11 @@ protected void closeOrReleaseResources() {
578581
private Map<TableIf, String> getIncrementalTableMap() throws AnalysisException {
579582
Map<TableIf, String> tableWithPartKey = Maps.newHashMap();
580583
if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) {
581-
tableWithPartKey
582-
.put(mtmv.getMvPartitionInfo().getRelatedTable(), mtmv.getMvPartitionInfo().getRelatedCol());
584+
List<BaseColInfo> pctInfos = mtmv.getMvPartitionInfo().getPctInfos();
585+
for (BaseColInfo pctInfo : pctInfos) {
586+
tableWithPartKey
587+
.put(MTMVUtil.getTable(pctInfo.getTableInfo()), pctInfo.getColName());
588+
}
583589
}
584590
return tableWithPartKey;
585591
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.mtmv;
19+
20+
import com.google.gson.annotations.SerializedName;
21+
22+
import java.util.Objects;
23+
24+
public class BaseColInfo {
25+
@SerializedName("ti")
26+
private BaseTableInfo tableInfo;
27+
@SerializedName("rn")
28+
private String colName;
29+
30+
public BaseColInfo(String colName, BaseTableInfo tableInfo) {
31+
this.colName = colName;
32+
this.tableInfo = tableInfo;
33+
}
34+
35+
public String getColName() {
36+
return colName;
37+
}
38+
39+
public void setColName(String colName) {
40+
this.colName = colName;
41+
}
42+
43+
public BaseTableInfo getTableInfo() {
44+
return tableInfo;
45+
}
46+
47+
public void setTableInfo(BaseTableInfo tableInfo) {
48+
this.tableInfo = tableInfo;
49+
}
50+
51+
@Override
52+
public boolean equals(Object o) {
53+
if (o == null || getClass() != o.getClass()) {
54+
return false;
55+
}
56+
BaseColInfo that = (BaseColInfo) o;
57+
return Objects.equals(tableInfo, that.tableInfo) && Objects.equals(colName, that.colName);
58+
}
59+
60+
@Override
61+
public int hashCode() {
62+
return Objects.hash(tableInfo, colName);
63+
}
64+
65+
@Override
66+
public String toString() {
67+
final StringBuilder sb = new StringBuilder("BaseColInfo{");
68+
sb.append("colName='").append(colName).append('\'');
69+
sb.append(", tableInfo=").append(tableInfo);
70+
sb.append('}');
71+
return sb.toString();
72+
}
73+
}

fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVBaseVersions.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@
1717

1818
package org.apache.doris.mtmv;
1919

20+
import com.google.common.collect.Maps;
21+
2022
import java.util.Map;
2123

2224
public class MTMVBaseVersions {
2325
private final Map<Long, Long> tableVersions;
24-
private final Map<String, Long> partitionVersions;
26+
private final Map<MTMVRelatedTableIf, Map<String, Long>> partitionVersions;
2527

26-
public MTMVBaseVersions(Map<Long, Long> tableVersions, Map<String, Long> partitionVersions) {
28+
public MTMVBaseVersions(Map<Long, Long> tableVersions,
29+
Map<MTMVRelatedTableIf, Map<String, Long>> partitionVersions) {
2730
this.tableVersions = tableVersions;
2831
this.partitionVersions = partitionVersions;
2932
}
@@ -32,7 +35,7 @@ public Map<Long, Long> getTableVersions() {
3235
return tableVersions;
3336
}
3437

35-
public Map<String, Long> getPartitionVersions() {
36-
return partitionVersions;
38+
public Map<String, Long> getPartitionVersions(MTMVRelatedTableIf mtmvRelatedTableIf) {
39+
return partitionVersions.getOrDefault(mtmvRelatedTableIf, Maps.newHashMap());
3740
}
3841
}

fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionCheckUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public static Pair<Boolean, String> compareDynamicPartition(OlapTable originalTa
120120
@VisibleForTesting
121121
public static Pair<Boolean, String> compareAutoPartition(OlapTable originalTable,
122122
OlapTable relatedTable) throws AnalysisException {
123-
if (!isDynamicPartition(relatedTable)) {
123+
if (!isAutoPartition(relatedTable)) {
124124
return Pair.of(false, "relatedTable is not dynamic partition.");
125125
}
126126
FunctionIntervalInfo originalFunctionIntervalInfo = PartitionExprUtil.getFunctionIntervalInfo(

fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionExprDateTrunc.java

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -69,18 +69,21 @@ public void analyze(MTMVPartitionInfo mvPartitionInfo) throws AnalysisException
6969
throw new AnalysisException(
7070
String.format("timeUnit not support: %s, only support: %s", this.timeUnit, timeUnits));
7171
}
72-
MTMVRelatedTableIf relatedTable = mvPartitionInfo.getRelatedTable();
73-
PartitionType partitionType = relatedTable.getPartitionType(MvccUtil.getSnapshotFromContext(relatedTable));
74-
if (partitionType == PartitionType.RANGE) {
75-
Type partitionColumnType = MTMVPartitionUtil
76-
.getPartitionColumnType(mvPartitionInfo.getRelatedTable(), mvPartitionInfo.getRelatedCol());
77-
if (!partitionColumnType.isDateType()) {
78-
throw new AnalysisException(
79-
"partitionColumnType should be date/datetime "
80-
+ "when PartitionType is range and expr is date_trunc");
72+
List<BaseColInfo> pctInfos = mvPartitionInfo.getPctInfos();
73+
for (BaseColInfo pctInfo : pctInfos) {
74+
MTMVRelatedTableIf pctTable = MTMVUtil.getRelatedTable(pctInfo.getTableInfo());
75+
PartitionType partitionType = pctTable.getPartitionType(MvccUtil.getSnapshotFromContext(pctTable));
76+
if (partitionType == PartitionType.RANGE) {
77+
Type partitionColumnType = MTMVPartitionUtil
78+
.getPartitionColumnType(pctTable, pctInfo.getColName());
79+
if (!partitionColumnType.isDateType()) {
80+
throw new AnalysisException(
81+
"partitionColumnType should be date/datetime "
82+
+ "when PartitionType is range and expr is date_trunc");
83+
}
84+
} else {
85+
throw new AnalysisException("date_trunc only support range partition");
8186
}
82-
} else {
83-
throw new AnalysisException("date_trunc only support range partition");
8487
}
8588
}
8689

@@ -125,9 +128,9 @@ private Optional<String> getDateFormat(Map<String, String> mvProperties) {
125128

126129
@Override
127130
public PartitionKeyDesc generateRollUpPartitionKeyDesc(PartitionKeyDesc partitionKeyDesc,
128-
MTMVPartitionInfo mvPartitionInfo) throws AnalysisException {
131+
MTMVPartitionInfo mvPartitionInfo, MTMVRelatedTableIf pctTable) throws AnalysisException {
129132
Type partitionColumnType = MTMVPartitionUtil
130-
.getPartitionColumnType(mvPartitionInfo.getRelatedTable(), mvPartitionInfo.getRelatedCol());
133+
.getPartitionColumnType(pctTable, mvPartitionInfo.getPartitionColByPctTable(pctTable));
131134
// mtmv only support one partition column
132135
Preconditions.checkState(partitionKeyDesc.getLowerValues().size() == 1,
133136
"only support one partition column");
@@ -243,4 +246,18 @@ private String dateTimeToStr(DateTimeV2Literal literal,
243246
"MTMV not support partition with column type : " + partitionColumnType);
244247
}
245248
}
249+
250+
@Override
251+
public boolean equals(Object o) {
252+
if (o == null || getClass() != o.getClass()) {
253+
return false;
254+
}
255+
MTMVPartitionExprDateTrunc that = (MTMVPartitionExprDateTrunc) o;
256+
return Objects.equals(timeUnit, that.timeUnit);
257+
}
258+
259+
@Override
260+
public int hashCode() {
261+
return Objects.hashCode(timeUnit);
262+
}
246263
}

fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionExprService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ String getRollUpIdentity(PartitionKeyDesc partitionKeyDesc, Map<String, String>
4747
* @throws AnalysisException
4848
*/
4949
PartitionKeyDesc generateRollUpPartitionKeyDesc(
50-
PartitionKeyDesc partitionKeyDesc, MTMVPartitionInfo mvPartitionInfo)
50+
PartitionKeyDesc partitionKeyDesc, MTMVPartitionInfo mvPartitionInfo, MTMVRelatedTableIf pctTable)
5151
throws AnalysisException;
5252

5353
/**

0 commit comments

Comments
 (0)