Skip to content

Commit b0aa80e

Browse files
dataroaringclaude
andcommitted
[feature](scan) Add read_mor_as_dup_tables session variable to read MOR tables without merge
Add a per-table session variable that allows reading MOR (Merge-On-Read) UNIQUE tables as DUP tables, skipping storage engine merge and delete sign filter while still honoring delete predicates. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 8079739 commit b0aa80e

File tree

10 files changed

+182
-4
lines changed

10 files changed

+182
-4
lines changed

be/src/pipeline/exec/olap_scan_operator.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -442,7 +442,13 @@ bool OlapScanLocalState::_storage_no_merge() {
442442
return (p._olap_scan_node.keyType == TKeysType::DUP_KEYS ||
443443
(p._olap_scan_node.keyType == TKeysType::UNIQUE_KEYS &&
444444
p._olap_scan_node.__isset.enable_unique_key_merge_on_write &&
445-
p._olap_scan_node.enable_unique_key_merge_on_write));
445+
p._olap_scan_node.enable_unique_key_merge_on_write)) ||
446+
_read_mor_as_dup();
447+
}
448+
449+
bool OlapScanLocalState::_read_mor_as_dup() {
450+
auto& p = _parent->cast<OlapScanOperatorX>();
451+
return p._olap_scan_node.__isset.read_mor_as_dup && p._olap_scan_node.read_mor_as_dup;
446452
}
447453

448454
Status OlapScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>* scanners) {

be/src/pipeline/exec/olap_scan_operator.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ class OlapScanLocalState final : public ScanLocalState<OlapScanLocalState> {
105105

106106
bool _storage_no_merge() override;
107107

108+
bool _read_mor_as_dup();
108109
bool _push_down_topn(const vectorized::RuntimePredicate& predicate) override {
109110
if (!predicate.target_is_slot(_parent->node_id())) {
110111
return false;

be/src/vec/exec/scan/olap_scanner.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,10 @@ Status OlapScanner::_init_tablet_reader_params(
336336
// if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty
337337
const bool single_version = _tablet_reader_params.has_single_version();
338338

339-
if (_state->skip_storage_engine_merge()) {
339+
auto* olap_local_state = static_cast<pipeline::OlapScanLocalState*>(_local_state);
340+
bool read_mor_as_dup = olap_local_state->olap_scan_node().__isset.read_mor_as_dup &&
341+
olap_local_state->olap_scan_node().read_mor_as_dup;
342+
if (_state->skip_storage_engine_merge() || read_mor_as_dup) {
340343
_tablet_reader_params.direct_mode = true;
341344
_tablet_reader_params.aggregation = true;
342345
} else {

fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,10 @@ private Expression generateAggFunction(SlotReference slot, Column column) {
370370
public static LogicalPlan checkAndAddDeleteSignFilter(LogicalOlapScan scan, ConnectContext connectContext,
371371
OlapTable olapTable) {
372372
if (!Util.showHiddenColumns() && scan.getTable().hasDeleteSign()
373-
&& !connectContext.getSessionVariable().skipDeleteSign()) {
373+
&& !connectContext.getSessionVariable().skipDeleteSign()
374+
&& !(olapTable.isMorTable()
375+
&& connectContext.getSessionVariable().isReadMorAsDupEnabled(
376+
olapTable.getQualifiedDbName(), olapTable.getName()))) {
374377
// table qualifier is catalog.db.table, we make db.table.column
375378
Slot deleteSlot = null;
376379
for (Slot slot : scan.getOutput()) {

fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SetPreAggStatus.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter;
5454
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
5555
import org.apache.doris.nereids.util.ExpressionUtils;
56+
import org.apache.doris.qe.ConnectContext;
5657

5758
import com.google.common.base.Preconditions;
5859
import com.google.common.collect.Lists;
@@ -151,7 +152,13 @@ public Plan visitLogicalOlapScan(LogicalOlapScan logicalOlapScan, Stack<PreAggIn
151152
long selectIndexId = logicalOlapScan.getSelectedIndexId();
152153
MaterializedIndexMeta meta = logicalOlapScan.getTable().getIndexMetaByIndexId(selectIndexId);
153154
if (meta.getKeysType() == KeysType.DUP_KEYS || (meta.getKeysType() == KeysType.UNIQUE_KEYS
154-
&& logicalOlapScan.getTable().getEnableUniqueKeyMergeOnWrite())) {
155+
&& logicalOlapScan.getTable().getEnableUniqueKeyMergeOnWrite())
156+
|| (meta.getKeysType() == KeysType.UNIQUE_KEYS
157+
&& logicalOlapScan.getTable().isMorTable()
158+
&& ConnectContext.get() != null
159+
&& ConnectContext.get().getSessionVariable().isReadMorAsDupEnabled(
160+
logicalOlapScan.getTable().getQualifiedDbName(),
161+
logicalOlapScan.getTable().getName()))) {
155162
return logicalOlapScan.withPreAggStatus(PreAggStatus.on());
156163
} else {
157164
if (context.empty()) {

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -752,6 +752,13 @@ AGGREGATE KEY (siteid,citycode,username)
752752
&& getTable().getKeysType() == KeysType.UNIQUE_KEYS) {
753753
return;
754754
}
755+
// When readMorAsDup is enabled, MOR tables are read as DUP, so uniqueness cannot be guaranteed.
756+
if (getTable().getKeysType() == KeysType.UNIQUE_KEYS
757+
&& getTable().isMorTable()
758+
&& ConnectContext.get().getSessionVariable().isReadMorAsDupEnabled(
759+
getTable().getQualifiedDbName(), getTable().getName())) {
760+
return;
761+
}
755762
ImmutableSet.Builder<Slot> uniqSlots = ImmutableSet.builderWithExpectedSize(outputSet.size());
756763
for (Slot slot : outputSet) {
757764
if (!(slot instanceof SlotReference)) {

fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1188,6 +1188,10 @@ protected void toThrift(TPlanNode msg) {
11881188
boolean enabled = ConnectContext.get().getSessionVariable()
11891189
.isMorValuePredicatePushdownEnabled(dbName, tblName);
11901190
msg.olap_scan_node.setEnableMorValuePredicatePushdown(enabled);
1191+
if (ConnectContext.get().getSessionVariable()
1192+
.isReadMorAsDupEnabled(dbName, tblName)) {
1193+
msg.olap_scan_node.setReadMorAsDup(true);
1194+
}
11911195
}
11921196

11931197
msg.setPushDownAggTypeOpt(pushDownAggNoGroupingOp);

fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -725,6 +725,8 @@ public class SessionVariable implements Serializable, Writable {
725725
public static final String ENABLE_MOR_VALUE_PREDICATE_PUSHDOWN_TABLES
726726
= "enable_mor_value_predicate_pushdown_tables";
727727

728+
public static final String READ_MOR_AS_DUP_TABLES = "read_mor_as_dup_tables";
729+
728730
// When set use fix replica = true, the fixed replica maybe bad, try to use the health one if
729731
// this session variable is set to true.
730732
public static final String FALLBACK_OTHER_REPLICA_WHEN_FIXED_CORRUPT = "fallback_other_replica_when_fixed_corrupt";
@@ -2191,6 +2193,14 @@ public boolean isEnableHboNonStrictMatchingMode() {
21912193
+ "Format: db1.tbl1,db2.tbl2 or * for all MOR tables."})
21922194
public String enableMorValuePredicatePushdownTables = "";
21932195

2196+
// Comma-separated list of MOR tables to read as DUP (skip merge, skip delete sign filter).
2197+
@VariableMgr.VarAttr(name = READ_MOR_AS_DUP_TABLES, needForward = true,
2198+
affectQueryResultInPlan = true, description = {
2199+
"指定以DUP模式读取MOR表的表列表(跳过合并和删除标记过滤),格式:db1.tbl1,db2.tbl2 或 * 表示所有MOR表。",
2200+
"Comma-separated list of MOR tables to read as DUP (skip merge, skip delete sign filter). "
2201+
+ "Format: db1.tbl1,db2.tbl2 or * for all MOR tables."})
2202+
public String readMorAsDupTables = "";
2203+
21942204
// Whether drop table when create table as select insert data appear error.
21952205
@VariableMgr.VarAttr(name = DROP_TABLE_IF_CTAS_FAILED, needForward = true)
21962206
public boolean dropTableIfCtasFailed = true;
@@ -4698,6 +4708,24 @@ public boolean isMorValuePredicatePushdownEnabled(String dbName, String tableNam
46984708
return false;
46994709
}
47004710

4711+
public boolean isReadMorAsDupEnabled(String dbName, String tableName) {
4712+
if (readMorAsDupTables == null || readMorAsDupTables.isEmpty()) {
4713+
return false;
4714+
}
4715+
String trimmed = readMorAsDupTables.trim();
4716+
if ("*".equals(trimmed)) {
4717+
return true;
4718+
}
4719+
String fullName = dbName + "." + tableName;
4720+
for (String table : trimmed.split(",")) {
4721+
if (table.trim().equalsIgnoreCase(fullName)
4722+
|| table.trim().equalsIgnoreCase(tableName)) {
4723+
return true;
4724+
}
4725+
}
4726+
return false;
4727+
}
4728+
47014729
/** canUseNereidsDistributePlanner */
47024730
public static boolean canUseNereidsDistributePlanner() {
47034731
ConnectContext connectContext = ConnectContext.get();

gensrc/thrift/PlanNodes.thrift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -889,6 +889,8 @@ struct TOlapScanNode {
889889
22: optional i64 ann_sort_limit
890890
// Enable value predicate pushdown for MOR tables
891891
23: optional bool enable_mor_value_predicate_pushdown
892+
// Read MOR table as DUP table: skip merge, skip delete sign
893+
24: optional bool read_mor_as_dup
892894
}
893895

894896
struct TEqJoinCondition {
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
suite("test_read_mor_as_dup") {
19+
def tableName = "test_read_mor_as_dup_tbl"
20+
def tableName2 = "test_read_mor_as_dup_tbl2"
21+
22+
sql "DROP TABLE IF EXISTS ${tableName}"
23+
sql "DROP TABLE IF EXISTS ${tableName2}"
24+
25+
// Create a MOR (Merge-On-Read) UNIQUE table
26+
sql """
27+
CREATE TABLE ${tableName} (
28+
`k` int NOT NULL,
29+
`v1` int NULL,
30+
`v2` varchar(100) NULL
31+
) ENGINE=OLAP
32+
UNIQUE KEY(`k`)
33+
DISTRIBUTED BY HASH(`k`) BUCKETS 1
34+
PROPERTIES (
35+
"enable_unique_key_merge_on_write" = "false",
36+
"replication_num" = "1"
37+
);
38+
"""
39+
40+
// Create a second MOR table for per-table control test
41+
sql """
42+
CREATE TABLE ${tableName2} (
43+
`k` int NOT NULL,
44+
`v1` int NULL
45+
) ENGINE=OLAP
46+
UNIQUE KEY(`k`)
47+
DISTRIBUTED BY HASH(`k`) BUCKETS 1
48+
PROPERTIES (
49+
"enable_unique_key_merge_on_write" = "false",
50+
"replication_num" = "1"
51+
);
52+
"""
53+
54+
// Insert multiple versions of the same key
55+
sql "INSERT INTO ${tableName} VALUES (1, 10, 'first');"
56+
sql "INSERT INTO ${tableName} VALUES (1, 20, 'second');"
57+
sql "INSERT INTO ${tableName} VALUES (1, 30, 'third');"
58+
sql "INSERT INTO ${tableName} VALUES (2, 100, 'only_version');"
59+
60+
// Insert a row and then delete it
61+
sql "INSERT INTO ${tableName} VALUES (3, 50, 'to_be_deleted');"
62+
sql "DELETE FROM ${tableName} WHERE k = 3;"
63+
64+
// Insert into second table
65+
sql "INSERT INTO ${tableName2} VALUES (1, 10);"
66+
sql "INSERT INTO ${tableName2} VALUES (1, 20);"
67+
68+
// === Test 1: Normal query returns merged result ===
69+
sql "SET read_mor_as_dup_tables = '';"
70+
def normalResult = sql "SELECT * FROM ${tableName} ORDER BY k;"
71+
// Should see only latest version per key: (1,30,'third'), (2,100,'only_version')
72+
// Key 3 was deleted, should not appear
73+
assertTrue(normalResult.size() == 2, "Normal query should return 2 rows (merged), got ${normalResult.size()}")
74+
75+
// === Test 2: Wildcard — read all MOR tables as DUP ===
76+
sql "SET read_mor_as_dup_tables = '*';"
77+
def dupResult = sql "SELECT * FROM ${tableName} ORDER BY k, v1;"
78+
// Should see all row versions for key 1 (3 versions) + key 2 (1 version)
79+
// Key 3: delete predicates are still applied, so deleted row should still be filtered
80+
// But the delete sign filter is skipped, so we may see it depending on how delete was done.
81+
// For MOR tables, DELETE adds a delete predicate in rowsets, which IS still honored.
82+
assertTrue(dupResult.size() >= 4, "DUP mode should return at least 4 rows (all versions), got ${dupResult.size()}")
83+
84+
// Verify key 1 has multiple versions
85+
def key1Rows = dupResult.findAll { it[0] == 1 }
86+
assertTrue(key1Rows.size() == 3, "Key 1 should have 3 versions in DUP mode, got ${key1Rows.size()}")
87+
88+
// === Test 3: Per-table control with db.table format ===
89+
def dbName = sql "SELECT DATABASE();"
90+
def currentDb = dbName[0][0]
91+
92+
sql "SET read_mor_as_dup_tables = '${currentDb}.${tableName}';"
93+
94+
// tableName should be in DUP mode
95+
def perTableResult = sql "SELECT * FROM ${tableName} ORDER BY k, v1;"
96+
assertTrue(perTableResult.size() >= 4, "Per-table DUP mode should return all versions, got ${perTableResult.size()}")
97+
98+
// tableName2 should still be in normal (merged) mode
99+
def table2Result = sql "SELECT * FROM ${tableName2} ORDER BY k;"
100+
assertTrue(table2Result.size() == 1, "Table2 should still be merged (1 row), got ${table2Result.size()}")
101+
102+
// === Test 4: Per-table control with just table name ===
103+
sql "SET read_mor_as_dup_tables = '${tableName2}';"
104+
105+
// tableName should now be in normal mode
106+
def revertedResult = sql "SELECT * FROM ${tableName} ORDER BY k;"
107+
assertTrue(revertedResult.size() == 2, "Table1 should be merged again (2 rows), got ${revertedResult.size()}")
108+
109+
// tableName2 should be in DUP mode
110+
def table2DupResult = sql "SELECT * FROM ${tableName2} ORDER BY k, v1;"
111+
assertTrue(table2DupResult.size() == 2, "Table2 in DUP mode should return 2 rows, got ${table2DupResult.size()}")
112+
113+
// === Cleanup ===
114+
sql "SET read_mor_as_dup_tables = '';"
115+
sql "DROP TABLE IF EXISTS ${tableName}"
116+
sql "DROP TABLE IF EXISTS ${tableName2}"
117+
}

0 commit comments

Comments
 (0)