Skip to content

Commit 4fbb3a0

Browse files
dataroaringclaude
andcommitted
[feature](scan) Add read_mor_as_dup_tables session variable to read MOR tables without merge
Add a per-table session variable that allows reading MOR (Merge-On-Read) UNIQUE tables as DUP tables, skipping storage engine merge and delete sign filter while still honoring delete predicates. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 8079739 commit 4fbb3a0

File tree

11 files changed

+449
-4
lines changed

11 files changed

+449
-4
lines changed

be/src/pipeline/exec/olap_scan_operator.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -442,7 +442,13 @@ bool OlapScanLocalState::_storage_no_merge() {
442442
return (p._olap_scan_node.keyType == TKeysType::DUP_KEYS ||
443443
(p._olap_scan_node.keyType == TKeysType::UNIQUE_KEYS &&
444444
p._olap_scan_node.__isset.enable_unique_key_merge_on_write &&
445-
p._olap_scan_node.enable_unique_key_merge_on_write));
445+
p._olap_scan_node.enable_unique_key_merge_on_write)) ||
446+
_read_mor_as_dup();
447+
}
448+
449+
bool OlapScanLocalState::_read_mor_as_dup() {
450+
auto& p = _parent->cast<OlapScanOperatorX>();
451+
return p._olap_scan_node.__isset.read_mor_as_dup && p._olap_scan_node.read_mor_as_dup;
446452
}
447453

448454
Status OlapScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>* scanners) {

be/src/pipeline/exec/olap_scan_operator.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ class OlapScanLocalState final : public ScanLocalState<OlapScanLocalState> {
105105

106106
bool _storage_no_merge() override;
107107

108+
bool _read_mor_as_dup();
108109
bool _push_down_topn(const vectorized::RuntimePredicate& predicate) override {
109110
if (!predicate.target_is_slot(_parent->node_id())) {
110111
return false;

be/src/vec/exec/scan/olap_scanner.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,10 @@ Status OlapScanner::_init_tablet_reader_params(
336336
// if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty
337337
const bool single_version = _tablet_reader_params.has_single_version();
338338

339-
if (_state->skip_storage_engine_merge()) {
339+
auto* olap_local_state = static_cast<pipeline::OlapScanLocalState*>(_local_state);
340+
bool read_mor_as_dup = olap_local_state->olap_scan_node().__isset.read_mor_as_dup &&
341+
olap_local_state->olap_scan_node().read_mor_as_dup;
342+
if (_state->skip_storage_engine_merge() || read_mor_as_dup) {
340343
_tablet_reader_params.direct_mode = true;
341344
_tablet_reader_params.aggregation = true;
342345
} else {

fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,10 @@ private Expression generateAggFunction(SlotReference slot, Column column) {
370370
public static LogicalPlan checkAndAddDeleteSignFilter(LogicalOlapScan scan, ConnectContext connectContext,
371371
OlapTable olapTable) {
372372
if (!Util.showHiddenColumns() && scan.getTable().hasDeleteSign()
373-
&& !connectContext.getSessionVariable().skipDeleteSign()) {
373+
&& !connectContext.getSessionVariable().skipDeleteSign()
374+
&& !(olapTable.isMorTable()
375+
&& connectContext.getSessionVariable().isReadMorAsDupEnabled(
376+
olapTable.getQualifiedDbName(), olapTable.getName()))) {
374377
// table qualifier is catalog.db.table, we make db.table.column
375378
Slot deleteSlot = null;
376379
for (Slot slot : scan.getOutput()) {

fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SetPreAggStatus.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter;
5454
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
5555
import org.apache.doris.nereids.util.ExpressionUtils;
56+
import org.apache.doris.qe.ConnectContext;
5657

5758
import com.google.common.base.Preconditions;
5859
import com.google.common.collect.Lists;
@@ -151,7 +152,13 @@ public Plan visitLogicalOlapScan(LogicalOlapScan logicalOlapScan, Stack<PreAggIn
151152
long selectIndexId = logicalOlapScan.getSelectedIndexId();
152153
MaterializedIndexMeta meta = logicalOlapScan.getTable().getIndexMetaByIndexId(selectIndexId);
153154
if (meta.getKeysType() == KeysType.DUP_KEYS || (meta.getKeysType() == KeysType.UNIQUE_KEYS
154-
&& logicalOlapScan.getTable().getEnableUniqueKeyMergeOnWrite())) {
155+
&& logicalOlapScan.getTable().getEnableUniqueKeyMergeOnWrite())
156+
|| (meta.getKeysType() == KeysType.UNIQUE_KEYS
157+
&& logicalOlapScan.getTable().isMorTable()
158+
&& ConnectContext.get() != null
159+
&& ConnectContext.get().getSessionVariable().isReadMorAsDupEnabled(
160+
logicalOlapScan.getTable().getQualifiedDbName(),
161+
logicalOlapScan.getTable().getName()))) {
155162
return logicalOlapScan.withPreAggStatus(PreAggStatus.on());
156163
} else {
157164
if (context.empty()) {

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -752,6 +752,13 @@ AGGREGATE KEY (siteid,citycode,username)
752752
&& getTable().getKeysType() == KeysType.UNIQUE_KEYS) {
753753
return;
754754
}
755+
// When readMorAsDup is enabled, MOR tables are read as DUP, so uniqueness cannot be guaranteed.
756+
if (getTable().getKeysType() == KeysType.UNIQUE_KEYS
757+
&& getTable().isMorTable()
758+
&& ConnectContext.get().getSessionVariable().isReadMorAsDupEnabled(
759+
getTable().getQualifiedDbName(), getTable().getName())) {
760+
return;
761+
}
755762
ImmutableSet.Builder<Slot> uniqSlots = ImmutableSet.builderWithExpectedSize(outputSet.size());
756763
for (Slot slot : outputSet) {
757764
if (!(slot instanceof SlotReference)) {

fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1188,6 +1188,10 @@ protected void toThrift(TPlanNode msg) {
11881188
boolean enabled = ConnectContext.get().getSessionVariable()
11891189
.isMorValuePredicatePushdownEnabled(dbName, tblName);
11901190
msg.olap_scan_node.setEnableMorValuePredicatePushdown(enabled);
1191+
if (ConnectContext.get().getSessionVariable()
1192+
.isReadMorAsDupEnabled(dbName, tblName)) {
1193+
msg.olap_scan_node.setReadMorAsDup(true);
1194+
}
11911195
}
11921196

11931197
msg.setPushDownAggTypeOpt(pushDownAggNoGroupingOp);

fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -725,6 +725,8 @@ public class SessionVariable implements Serializable, Writable {
725725
public static final String ENABLE_MOR_VALUE_PREDICATE_PUSHDOWN_TABLES
726726
= "enable_mor_value_predicate_pushdown_tables";
727727

728+
public static final String READ_MOR_AS_DUP_TABLES = "read_mor_as_dup_tables";
729+
728730
// When set use fix replica = true, the fixed replica maybe bad, try to use the health one if
729731
// this session variable is set to true.
730732
public static final String FALLBACK_OTHER_REPLICA_WHEN_FIXED_CORRUPT = "fallback_other_replica_when_fixed_corrupt";
@@ -2191,6 +2193,14 @@ public boolean isEnableHboNonStrictMatchingMode() {
21912193
+ "Format: db1.tbl1,db2.tbl2 or * for all MOR tables."})
21922194
public String enableMorValuePredicatePushdownTables = "";
21932195

2196+
// Comma-separated list of MOR tables to read as DUP (skip merge, skip delete sign filter).
2197+
@VariableMgr.VarAttr(name = READ_MOR_AS_DUP_TABLES, needForward = true,
2198+
affectQueryResultInPlan = true, description = {
2199+
"指定以DUP模式读取MOR表的表列表(跳过合并和删除标记过滤),格式:db1.tbl1,db2.tbl2 或 * 表示所有MOR表。",
2200+
"Comma-separated list of MOR tables to read as DUP (skip merge, skip delete sign filter). "
2201+
+ "Format: db1.tbl1,db2.tbl2 or * for all MOR tables."})
2202+
public String readMorAsDupTables = "";
2203+
21942204
// Whether drop table when create table as select insert data appear error.
21952205
@VariableMgr.VarAttr(name = DROP_TABLE_IF_CTAS_FAILED, needForward = true)
21962206
public boolean dropTableIfCtasFailed = true;
@@ -4698,6 +4708,24 @@ public boolean isMorValuePredicatePushdownEnabled(String dbName, String tableNam
46984708
return false;
46994709
}
47004710

4711+
public boolean isReadMorAsDupEnabled(String dbName, String tableName) {
4712+
if (readMorAsDupTables == null || readMorAsDupTables.isEmpty()) {
4713+
return false;
4714+
}
4715+
String trimmed = readMorAsDupTables.trim();
4716+
if ("*".equals(trimmed)) {
4717+
return true;
4718+
}
4719+
String fullName = dbName + "." + tableName;
4720+
for (String table : trimmed.split(",")) {
4721+
if (table.trim().equalsIgnoreCase(fullName)
4722+
|| table.trim().equalsIgnoreCase(tableName)) {
4723+
return true;
4724+
}
4725+
}
4726+
return false;
4727+
}
4728+
47014729
/** canUseNereidsDistributePlanner */
47024730
public static boolean canUseNereidsDistributePlanner() {
47034731
ConnectContext connectContext = ConnectContext.get();

0 commit comments

Comments
 (0)