From f4b5d42194167050dc57321e79e601349d2bab12 Mon Sep 17 00:00:00 2001 From: yaoxiao Date: Thu, 21 May 2026 14:21:53 +0800 Subject: [PATCH 1/7] [fix] avoid concurrent tablet stat iteration failures --- .../org/apache/doris/catalog/LocalTablet.java | 2 +- .../doris/catalog/MaterializedIndex.java | 2 +- .../apache/doris/catalog/TabletStatMgr.java | 9 +- ...stat_mgr_concurrent_partition_churn.groovy | 169 ++++++++++++++++++ 4 files changed, 179 insertions(+), 3 deletions(-) create mode 100644 regression-test/suites/nonConcurrent/tablet_stat_mgr_p0/test_tablet_stat_mgr_concurrent_partition_churn.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java index 14d9171f3ff0e4..17f3911bfc62b5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java @@ -254,7 +254,7 @@ public void addReplica(Replica replica, boolean isRestore) { @Override public List getReplicas() { - return this.replicas; + return Lists.newArrayList(this.replicas); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java index 2e98354acef886..d18c670786934b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java @@ -94,7 +94,7 @@ public MaterializedIndex(long id, IndexState state) { } public List getTablets() { - return tablets; + return Lists.newArrayList(tablets); } public List getTabletIdsInOrder() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java index bcf74528da6fd9..3b40b1315cc6b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java @@ -382,7 +382,14 @@ private void updateTabletStat(Long beId, TTabletStatResult result) { if (result.isSetTabletStatList()) { for (TTabletStat stat : result.getTabletStatList()) { if (invertedIndex.getTabletMeta(stat.getTabletId()) != null) { - Replica replica = invertedIndex.getReplica(stat.getTabletId(), beId); + Replica replica; + try { + replica = invertedIndex.getReplica(stat.getTabletId(), beId); + } catch (IllegalStateException e) { + LOG.debug("skip stale tablet stat update for tablet {} on backend {}", + stat.getTabletId(), beId, e); + continue; + } if (replica != null) { replica.setDataSize(stat.getDataSize()); replica.setRemoteDataSize(stat.getRemoteDataSize()); diff --git a/regression-test/suites/nonConcurrent/tablet_stat_mgr_p0/test_tablet_stat_mgr_concurrent_partition_churn.groovy b/regression-test/suites/nonConcurrent/tablet_stat_mgr_p0/test_tablet_stat_mgr_concurrent_partition_churn.groovy new file mode 100644 index 00000000000000..d199a820350ef0 --- /dev/null +++ b/regression-test/suites/nonConcurrent/tablet_stat_mgr_p0/test_tablet_stat_mgr_concurrent_partition_churn.groovy @@ -0,0 +1,169 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.io.RandomAccessFile +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicReference + +suite("test_tablet_stat_mgr_concurrent_partition_churn", "nonConcurrent") { + String dbName = context.config.getDbNameByFile(context.file) + sql "select 1" + + String tableName = "test_issue_59138" + String oldInterval = null + AtomicBoolean stopped = new AtomicBoolean(false) + AtomicReference firstError = new AtomicReference<>() + + String dorisHome = System.getProperty("DORIS_HOME") + if (dorisHome == null || dorisHome.isEmpty()) { + dorisHome = context.config.suitePath.replace("/regression-test/suites", "") + } + + File feLog = new File("${dorisHome}/fe/log/fe.log") + File feWarnLog = new File("${dorisHome}/fe/log/fe.warn.log") + long feLogOffset = feLog.exists() ? feLog.length() : 0L + long feWarnLogOffset = feWarnLog.exists() ? feWarnLog.length() : 0L + + def readAppendedLog = { File file, long offset -> + if (!file.exists()) { + return "" + } + RandomAccessFile raf = new RandomAccessFile(file, "r") + try { + long safeOffset = Math.min(offset, raf.length()) + raf.seek(safeOffset) + int size = (int) (raf.length() - safeOffset) + if (size <= 0) { + return "" + } + byte[] bytes = new byte[size] + raf.readFully(bytes) + return new String(bytes, "UTF-8") + } finally { + raf.close() + } + } + + def failIfNeeded = { + if (firstError.get() != null) { + throw firstError.get() + } + } + + def startWorker = { Closure body -> + Thread.startDaemon { + try { + connect(context.config.jdbcUser, context.config.jdbcPassword, context.config.jdbcUrl) { + sql "use ${dbName}" + body() + } + } catch (Throwable t) { + logger.warn("tablet stat mgr concurrency worker failed", t) + firstError.compareAndSet(null, t) + stopped.set(true) + } + } + } + + try { + def configRow = sql """ ADMIN SHOW FRONTEND CONFIG LIKE 'tablet_stat_update_interval_second' """ + oldInterval = configRow[0][1] + sql """ ADMIN SET FRONTEND CONFIG ("tablet_stat_update_interval_second" = "1") """ + + sql """ DROP TABLE IF EXISTS ${tableName} FORCE """ + sql """ + CREATE TABLE ${tableName} ( + `k1` INT NOT NULL, + `k2` INT NOT NULL + ) + DUPLICATE KEY(`k1`) + PARTITION BY RANGE(`k1`) ( + PARTITION p0 VALUES LESS THAN ("100"), + PARTITION p1 VALUES LESS THAN ("200") + ) + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ) + """ + + sql """ INSERT INTO ${tableName} VALUES (1, 1), (101, 1) """ + + def insertWorker = startWorker { + int i = 0 + while (!stopped.get()) { + int left = i % 90 + int right = 100 + (i % 90) + sql """ INSERT INTO ${tableName} VALUES (${left}, ${i}), (${right}, ${i}) """ + i++ + } + } + + def partitionWorker = startWorker { + int i = 0 + while (!stopped.get()) { + String partitionName = "p_dyn_${i}" + int upperBound = 300 + i * 100 + sql """ ALTER TABLE ${tableName} ADD PARTITION ${partitionName} VALUES LESS THAN ("${upperBound}") """ + sql """ ALTER TABLE ${tableName} DROP PARTITION IF EXISTS ${partitionName} FORCE """ + i++ + } + } + + def tableWorker = startWorker { + int i = 0 + while (!stopped.get()) { + String tempTable = "tmp_issue_59138_${i}" + sql """ + CREATE TABLE ${tempTable} ( + `k1` INT NOT NULL, + `k2` INT NOT NULL + ) + DUPLICATE KEY(`k1`) + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ) + """ + sql """ DROP TABLE IF EXISTS ${tempTable} FORCE """ + i++ + } + } + + sleep(12000) + stopped.set(true) + [insertWorker, partitionWorker, tableWorker].each { Thread thread -> + thread.join(10000) + } + + failIfNeeded() + sql "sync" + def rowCount = sql """ SELECT count(*) FROM ${tableName} """ + assertTrue(rowCount[0][0].toLong() > 0L) + + sleep(3000) + String appendedLogs = readAppendedLog(feLog, feLogOffset) + readAppendedLog(feWarnLog, feWarnLogOffset) + assertFalse(appendedLogs.contains("ConcurrentModificationException")) + assertFalse(appendedLogs.contains("daemon thread got exception. name: tablet stat mgr")) + } finally { + stopped.set(true) + if (oldInterval != null) { + sql """ ADMIN SET FRONTEND CONFIG ("tablet_stat_update_interval_second" = "${oldInterval}") """ + } + sql """ DROP TABLE IF EXISTS ${tableName} FORCE """ + } +} From 2f42b6d6a8327eb0cc998addc172d05ae303a8ba Mon Sep 17 00:00:00 2001 From: yaoxiao Date: Thu, 21 May 2026 14:21:53 +0800 Subject: [PATCH 2/7] optimize --- .../java/org/apache/doris/catalog/TabletStatMgr.java | 9 ++++++++- ...est_tablet_stat_mgr_concurrent_partition_churn.groovy | 5 +++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java index 3b40b1315cc6b9..5c5d2138c461cc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java @@ -418,7 +418,14 @@ private void updateTabletStat(Long beId, TTabletStatResult result) { // the replica is obsolete, ignore it. continue; } - Replica replica = invertedIndex.getReplica(entry.getKey(), beId); + Replica replica; + try { + replica = invertedIndex.getReplica(entry.getKey(), beId); + } catch (IllegalStateException e) { + LOG.debug("skip stale tablet stat update for tablet {} on backend {}", + entry.getKey(), beId, e); + continue; + } if (replica == null) { // replica may be deleted from catalog, ignore it. continue; diff --git a/regression-test/suites/nonConcurrent/tablet_stat_mgr_p0/test_tablet_stat_mgr_concurrent_partition_churn.groovy b/regression-test/suites/nonConcurrent/tablet_stat_mgr_p0/test_tablet_stat_mgr_concurrent_partition_churn.groovy index d199a820350ef0..6a4d683b7157cc 100644 --- a/regression-test/suites/nonConcurrent/tablet_stat_mgr_p0/test_tablet_stat_mgr_concurrent_partition_churn.groovy +++ b/regression-test/suites/nonConcurrent/tablet_stat_mgr_p0/test_tablet_stat_mgr_concurrent_partition_churn.groovy @@ -23,7 +23,7 @@ suite("test_tablet_stat_mgr_concurrent_partition_churn", "nonConcurrent") { String dbName = context.config.getDbNameByFile(context.file) sql "select 1" - String tableName = "test_issue_59138" + String tableName = "test_tablet_stat_mgr_churn" String oldInterval = null AtomicBoolean stopped = new AtomicBoolean(false) AtomicReference firstError = new AtomicReference<>() @@ -40,6 +40,7 @@ suite("test_tablet_stat_mgr_concurrent_partition_churn", "nonConcurrent") { def readAppendedLog = { File file, long offset -> if (!file.exists()) { + logger.warn("skip checking appended log because {} does not exist", file.getAbsolutePath()) return "" } RandomAccessFile raf = new RandomAccessFile(file, "r") @@ -127,7 +128,7 @@ suite("test_tablet_stat_mgr_concurrent_partition_churn", "nonConcurrent") { def tableWorker = startWorker { int i = 0 while (!stopped.get()) { - String tempTable = "tmp_issue_59138_${i}" + String tempTable = "tmp_tablet_stat_mgr_churn_${i}" sql """ CREATE TABLE ${tempTable} ( `k1` INT NOT NULL, From ca6cc46f40513251e4a9bc2890cbf76ea71b8e6c Mon Sep 17 00:00:00 2001 From: yaoxiao Date: Thu, 21 May 2026 14:21:53 +0800 Subject: [PATCH 3/7] optimize code --- .../src/main/java/org/apache/doris/catalog/LocalTablet.java | 1 + 1 file changed, 1 insertion(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java index 17f3911bfc62b5..46fc749493c80e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java @@ -26,6 +26,7 @@ import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; +import com.google.common.collect.Lists; import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; From 9714dbeb08db03cd889347cde65d2203f3c38be3 Mon Sep 17 00:00:00 2001 From: yaoxiao Date: Thu, 21 May 2026 14:21:54 +0800 Subject: [PATCH 4/7] [improvement] close residual CME race with volatile-snapshot replica/tablet lists Replace defensive-copy-on-read with copy-on-write via volatile snapshot for LocalTablet.replicas and MaterializedIndex.tablets. Writers (synchronized) build a new list and swap the volatile reference; readers take a single volatile read and iterate an immutable snapshot, so getReplicas()/getTablets() can no longer throw ConcurrentModificationException even while a concurrent DDL thread mutates the tablet, and the hot read path no longer copies elements. Also add a staleTabletStatSkipped counter to TabletStatMgr to make the TOCTOU skip observable, harden the regression test with a positive SHOW DATA assertion and orphan-table cleanup, and note the cloud stat-mgr path is covered. --- .../doris/catalog/CloudTabletStatMgr.java | 1 + .../org/apache/doris/catalog/LocalTablet.java | 76 +++++++++++-------- .../doris/catalog/MaterializedIndex.java | 38 ++++++---- .../apache/doris/catalog/TabletStatMgr.java | 19 ++++- ...stat_mgr_concurrent_partition_churn.groovy | 17 +++++ 5 files changed, 102 insertions(+), 49 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java index 6765a001492301..118aa401227849 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java @@ -366,6 +366,7 @@ private void updateStatInfo(List dbIds) { long tabletIndexSize = 0L; long tabletSegmentSize = 0L; + // getReplicas() returns an immutable volatile snapshot; CME-safe under concurrent DDL. for (Replica replica : tablet.getReplicas()) { if (replica.getDataSize() > tabletDataSize) { tabletDataSize = replica.getDataSize(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java index 46fc749493c80e..8b0bbff5131f57 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java @@ -26,14 +26,13 @@ import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; -import com.google.common.collect.Lists; import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; -import java.util.Iterator; import java.util.List; import java.util.stream.LongStream; @@ -41,7 +40,7 @@ public class LocalTablet extends Tablet { private static final Logger LOG = LogManager.getLogger(LocalTablet.class); @SerializedName(value = "rs", alternate = {"replicas"}) - private List replicas; + private volatile List replicas; @SerializedName(value = "lastCheckTime") private long lastCheckTime; @@ -224,29 +223,32 @@ public void setLastCheckTime(long lastCheckTime) { this.lastCheckTime = lastCheckTime; } - private boolean isLatestReplicaAndDeleteOld(Replica newReplica) { - boolean delete = false; + // Writers are synchronized on this tablet to prevent concurrent lost-update: + // some callers (e.g. InternalCatalog.createTablets, RestoreJob) do NOT hold + // the OlapTable write lock when modifying replicas. + // Readers capture the volatile reference once and iterate freely — no lock needed. + + @Override + public synchronized void addReplica(Replica replica, boolean isRestore) { + long version = replica.getVersion(); + long backendId = replica.getBackendIdWithoutException(); boolean hasBackend = false; - long version = newReplica.getVersion(); - Iterator iterator = replicas.iterator(); - while (iterator.hasNext()) { - Replica replica = iterator.next(); - if (replica.getBackendIdWithoutException() == newReplica.getBackendIdWithoutException()) { + boolean deletedOld = false; + List current = replicas; + List next = new ArrayList<>(current.size() + 1); + for (Replica r : current) { + if (r.getBackendIdWithoutException() == backendId) { hasBackend = true; - if (replica.getVersion() <= version) { - iterator.remove(); - delete = true; + if (r.getVersion() <= version) { + deletedOld = true; + continue; // drop stale replica } } + next.add(r); } - - return delete || !hasBackend; - } - - @Override - public void addReplica(Replica replica, boolean isRestore) { - if (isLatestReplicaAndDeleteOld(replica)) { - replicas.add(replica); + if (deletedOld || !hasBackend) { + next.add(replica); + replicas = next; // volatile write; readers see the new immutable snapshot if (!isRestore) { Env.getCurrentInvertedIndex().addReplica(id, replica); } @@ -255,7 +257,8 @@ public void addReplica(Replica replica, boolean isRestore) { @Override public List getReplicas() { - return Lists.newArrayList(this.replicas); + // Volatile read: returns the current immutable snapshot; callers iterate without locking. + return Collections.unmodifiableList(replicas); } @Override @@ -269,9 +272,12 @@ public Replica getReplicaByBackendId(long backendId) { } @Override - public boolean deleteReplica(Replica replica) { - if (replicas.contains(replica)) { - replicas.remove(replica); + public synchronized boolean deleteReplica(Replica replica) { + List current = replicas; + if (current.contains(replica)) { + List next = new ArrayList<>(current); + next.remove(replica); + replicas = next; // volatile write Env.getCurrentInvertedIndex().deleteReplica(id, replica.getBackendIdWithoutException()); return true; } @@ -279,16 +285,22 @@ public boolean deleteReplica(Replica replica) { } @Override - public boolean deleteReplicaByBackendId(long backendId) { - Iterator iterator = replicas.iterator(); - while (iterator.hasNext()) { - Replica replica = iterator.next(); + public synchronized boolean deleteReplicaByBackendId(long backendId) { + List current = replicas; + List next = new ArrayList<>(current.size()); + Replica found = null; + for (Replica replica : current) { if (replica.getBackendIdWithoutException() == backendId) { - iterator.remove(); - Env.getCurrentInvertedIndex().deleteReplica(id, backendId); - return true; + found = replica; + } else { + next.add(replica); } } + if (found != null) { + replicas = next; // volatile write + Env.getCurrentInvertedIndex().deleteReplica(id, backendId); + return true; + } return false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java index d18c670786934b..9842d673564c8b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java @@ -19,10 +19,10 @@ import org.apache.doris.persist.gson.GsonPostProcessable; -import com.google.common.collect.Lists; import com.google.gson.annotations.SerializedName; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -60,7 +60,7 @@ public enum IndexExtState { private Map idToTablets; @SerializedName(value = "tablets") // this is for keeping tablet order - private List tablets; + private volatile List tablets; // for push after rollup index finished @SerializedName(value = "rollupIndexId") @@ -94,12 +94,14 @@ public MaterializedIndex(long id, IndexState state) { } public List getTablets() { - return Lists.newArrayList(tablets); + // Volatile read: returns the current immutable snapshot; callers iterate without locking. + return Collections.unmodifiableList(tablets); } public List getTabletIdsInOrder() { - List tabletIds = Lists.newArrayListWithCapacity(tablets.size()); - for (Tablet tablet : tablets) { + List snapshot = tablets; // single volatile read + List tabletIds = new ArrayList<>(snapshot.size()); + for (Tablet tablet : snapshot) { tabletIds.add(tablet.getId()); } return tabletIds; @@ -109,18 +111,26 @@ public Tablet getTablet(long tabletId) { return idToTablets.get(tabletId); } - public void clearTabletsForRestore() { + public synchronized void clearTabletsForRestore() { idToTablets.clear(); - tablets.clear(); + tablets = new ArrayList<>(); // volatile write } - public void addTablet(Tablet tablet, TabletMeta tabletMeta) { + public synchronized void addTablet(Tablet tablet, TabletMeta tabletMeta) { addTablet(tablet, tabletMeta, false); } - public void addTablet(Tablet tablet, TabletMeta tabletMeta, boolean isRestore) { + // Writers are synchronized on this index to prevent concurrent lost-update: + // some callers (e.g. InternalCatalog.createTablets) do NOT hold the OlapTable + // write lock when adding tablets. + // Copy-on-write makes incremental add O(n); bulk creation is thus O(n^2), but n + // (bucket count) is small and creation cost is dominated by replica/RPC work — the + // copy is negligible, and CME-safe reads on the hot query path are worth it. + public synchronized void addTablet(Tablet tablet, TabletMeta tabletMeta, boolean isRestore) { idToTablets.put(tablet.getId(), tablet); - tablets.add(tablet); + List next = new ArrayList<>(tablets); + next.add(tablet); + tablets = next; // volatile write; readers see the new immutable snapshot if (!isRestore) { Env.getCurrentInvertedIndex().addTablet(tablet.getId(), tabletMeta); } @@ -241,8 +251,9 @@ public long getRemoteSegmentSize() { } public int getTabletOrderIdx(long tabletId) { + List snapshot = tablets; // single volatile read int idx = 0; - for (Tablet tablet : tablets) { + for (Tablet tablet : snapshot) { if (tablet.getId() == tabletId) { return idx; } @@ -279,15 +290,16 @@ public boolean equals(Object obj) { @Override public String toString() { + List snapshot = tablets; // single volatile read StringBuilder buffer = new StringBuilder(); buffer.append("index id: ").append(id).append("; "); buffer.append("index state: ").append(state.name()).append("; "); buffer.append("row count: ").append(rowCount).append("; "); - buffer.append("tablets size: ").append(tablets.size()).append("; "); + buffer.append("tablets size: ").append(snapshot.size()).append("; "); // buffer.append("tablets: ["); - for (Tablet tablet : tablets) { + for (Tablet tablet : snapshot) { buffer.append("tablet: ").append(tablet.toString()).append(", "); } buffer.append("]; "); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java index 5c5d2138c461cc..749f5a5736e1a2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java @@ -41,6 +41,7 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; /* @@ -58,6 +59,14 @@ public class TabletStatMgr extends MasterDaemon { private MarkedCountDownLatch updateTabletStatsLatch = null; + // Counts TOCTOU skips (getTabletMeta != null but getReplica throws IllegalStateException). + // Exposed for testing: a value > 0 after a concurrent-DDL workload proves the race window was hit. + static final AtomicLong staleTabletStatSkipped = new AtomicLong(0); + + public static long getStaleTabletStatSkippedCount() { + return staleTabletStatSkipped.get(); + } + public TabletStatMgr() { super("tablet stat mgr", Config.tablet_stat_update_interval_second * 1000); } @@ -386,8 +395,9 @@ private void updateTabletStat(Long beId, TTabletStatResult result) { try { replica = invertedIndex.getReplica(stat.getTabletId(), beId); } catch (IllegalStateException e) { - LOG.debug("skip stale tablet stat update for tablet {} on backend {}", - stat.getTabletId(), beId, e); + staleTabletStatSkipped.incrementAndGet(); + LOG.debug("skip stale tablet stat update for tablet {} on backend {}: {}", + stat.getTabletId(), beId, e.getMessage()); continue; } if (replica != null) { @@ -422,8 +432,9 @@ private void updateTabletStat(Long beId, TTabletStatResult result) { try { replica = invertedIndex.getReplica(entry.getKey(), beId); } catch (IllegalStateException e) { - LOG.debug("skip stale tablet stat update for tablet {} on backend {}", - entry.getKey(), beId, e); + staleTabletStatSkipped.incrementAndGet(); + LOG.debug("skip stale tablet stat update for tablet {} on backend {}: {}", + entry.getKey(), beId, e.getMessage()); continue; } if (replica == null) { diff --git a/regression-test/suites/nonConcurrent/tablet_stat_mgr_p0/test_tablet_stat_mgr_concurrent_partition_churn.groovy b/regression-test/suites/nonConcurrent/tablet_stat_mgr_p0/test_tablet_stat_mgr_concurrent_partition_churn.groovy index 6a4d683b7157cc..5267ce3271561c 100644 --- a/regression-test/suites/nonConcurrent/tablet_stat_mgr_p0/test_tablet_stat_mgr_concurrent_partition_churn.groovy +++ b/regression-test/suites/nonConcurrent/tablet_stat_mgr_p0/test_tablet_stat_mgr_concurrent_partition_churn.groovy @@ -17,6 +17,7 @@ import java.io.RandomAccessFile import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicReference suite("test_tablet_stat_mgr_concurrent_partition_churn", "nonConcurrent") { @@ -27,6 +28,8 @@ suite("test_tablet_stat_mgr_concurrent_partition_churn", "nonConcurrent") { String oldInterval = null AtomicBoolean stopped = new AtomicBoolean(false) AtomicReference firstError = new AtomicReference<>() + // Track temp tables created by tableWorker so we can clean them up in finally. + AtomicInteger tableWorkerCount = new AtomicInteger(0) String dorisHome = System.getProperty("DORIS_HOME") if (dorisHome == null || dorisHome.isEmpty()) { @@ -129,6 +132,7 @@ suite("test_tablet_stat_mgr_concurrent_partition_churn", "nonConcurrent") { int i = 0 while (!stopped.get()) { String tempTable = "tmp_tablet_stat_mgr_churn_${i}" + tableWorkerCount.set(i) sql """ CREATE TABLE ${tempTable} ( `k1` INT NOT NULL, @@ -156,7 +160,16 @@ suite("test_tablet_stat_mgr_concurrent_partition_churn", "nonConcurrent") { def rowCount = sql """ SELECT count(*) FROM ${tableName} """ assertTrue(rowCount[0][0].toLong() > 0L) + // Wait for at least 2 more stat-mgr ticks (interval = 1 s) so that the daemon has a + // chance to run after the DDL storm and we can verify it completed successfully. sleep(3000) + + // Positive assertion: stat manager must have successfully updated the table's stats. + // DataSize is non-zero only if at least one tick completed without crashing. + def showData = sql """ SHOW DATA FROM ${tableName} """ + assertTrue(showData != null && !showData.isEmpty(), + "SHOW DATA returned no rows — stat mgr may not have run") + String appendedLogs = readAppendedLog(feLog, feLogOffset) + readAppendedLog(feWarnLog, feWarnLogOffset) assertFalse(appendedLogs.contains("ConcurrentModificationException")) assertFalse(appendedLogs.contains("daemon thread got exception. name: tablet stat mgr")) @@ -166,5 +179,9 @@ suite("test_tablet_stat_mgr_concurrent_partition_churn", "nonConcurrent") { sql """ ADMIN SET FRONTEND CONFIG ("tablet_stat_update_interval_second" = "${oldInterval}") """ } sql """ DROP TABLE IF EXISTS ${tableName} FORCE """ + // Clean up any orphaned temp tables left by tableWorker if it was interrupted mid-CREATE. + for (int i = 0; i <= tableWorkerCount.get(); i++) { + sql """ DROP TABLE IF EXISTS tmp_tablet_stat_mgr_churn_${i} FORCE """ + } } } From c646b5e28e298b60c278db44255d7b8546b85f67 Mon Sep 17 00:00:00 2001 From: yaoxiao Date: Fri, 22 May 2026 14:35:37 +0800 Subject: [PATCH 5/7] [improvement] cover concurrent replica/tablet reads with unit tests instead of e2e The end-to-end regression test relied on a timing-dependent DDL storm to hit the race window, which is unreliable on fast CI (DDL workers get serialized by table locks) and wastes cluster resources. Replace it with deterministic FE unit tests: - TabletTest / MaterializedIndexTest assert getReplicas()/getTablets() return an immutable copy-on-write snapshot (isolated from later writes, read-only) and that iterating the snapshot under concurrent mutation never throws CME. Also drop the staleTabletStatSkipped counter that only existed to let the removed e2e test observe the TOCTOU skip; the IllegalStateException catch already handles it. --- .../apache/doris/catalog/TabletStatMgr.java | 11 -- .../doris/catalog/MaterializedIndexTest.java | 65 ++++++ .../org/apache/doris/catalog/TabletTest.java | 79 ++++++++ ...stat_mgr_concurrent_partition_churn.groovy | 187 ------------------ 4 files changed, 144 insertions(+), 198 deletions(-) delete mode 100644 regression-test/suites/nonConcurrent/tablet_stat_mgr_p0/test_tablet_stat_mgr_concurrent_partition_churn.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java index 749f5a5736e1a2..ec99c8d45a991a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java @@ -41,7 +41,6 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; /* @@ -59,14 +58,6 @@ public class TabletStatMgr extends MasterDaemon { private MarkedCountDownLatch updateTabletStatsLatch = null; - // Counts TOCTOU skips (getTabletMeta != null but getReplica throws IllegalStateException). - // Exposed for testing: a value > 0 after a concurrent-DDL workload proves the race window was hit. - static final AtomicLong staleTabletStatSkipped = new AtomicLong(0); - - public static long getStaleTabletStatSkippedCount() { - return staleTabletStatSkipped.get(); - } - public TabletStatMgr() { super("tablet stat mgr", Config.tablet_stat_update_interval_second * 1000); } @@ -395,7 +386,6 @@ private void updateTabletStat(Long beId, TTabletStatResult result) { try { replica = invertedIndex.getReplica(stat.getTabletId(), beId); } catch (IllegalStateException e) { - staleTabletStatSkipped.incrementAndGet(); LOG.debug("skip stale tablet stat update for tablet {} on backend {}: {}", stat.getTabletId(), beId, e.getMessage()); continue; @@ -432,7 +422,6 @@ private void updateTabletStat(Long beId, TTabletStatResult result) { try { replica = invertedIndex.getReplica(entry.getKey(), beId); } catch (IllegalStateException e) { - staleTabletStatSkipped.incrementAndGet(); LOG.debug("skip stale tablet stat update for tablet {} on backend {}: {}", entry.getKey(), beId, e.getMessage()); continue; diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/MaterializedIndexTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/MaterializedIndexTest.java index e177717d366150..3ff3f2519a3d44 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/MaterializedIndexTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/MaterializedIndexTest.java @@ -21,6 +21,7 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.io.Text; import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.thrift.TStorageMedium; import org.junit.After; import org.junit.Assert; @@ -35,6 +36,8 @@ import java.nio.file.Paths; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; public class MaterializedIndexTest { @@ -73,6 +76,68 @@ public void getMethodTest() { Assert.assertEquals(indexId, index.getId()); } + @Test + public void testGetTabletsReturnsImmutableSnapshot() { + TabletMeta tabletMeta = new TabletMeta(10, 20, 30, 40, 1, TStorageMedium.HDD); + index.addTablet(new LocalTablet(1L), tabletMeta, true); + + List snapshot = index.getTablets(); + Assert.assertEquals(1, snapshot.size()); + + // A write after the snapshot was taken must not be visible in it (copy-on-write). + index.addTablet(new LocalTablet(2L), tabletMeta, true); + Assert.assertEquals(1, snapshot.size()); + Assert.assertEquals(2, index.getTablets().size()); + + // The returned snapshot is read-only. + Assert.assertThrows(UnsupportedOperationException.class, () -> snapshot.add(new LocalTablet(3L))); + } + + @Test + public void testConcurrentGetTabletsNeverThrows() throws InterruptedException { + // A reader repeatedly snapshots and iterates getTablets() while a writer keeps + // adding tablets. Copy-on-write guarantees the reader never observes a partially + // built list or throws ConcurrentModificationException. + TabletMeta tabletMeta = new TabletMeta(10, 20, 30, 40, 1, TStorageMedium.HDD); + AtomicReference error = new AtomicReference<>(); + AtomicBoolean stop = new AtomicBoolean(false); + + Thread writer = new Thread(() -> { + long id = 1000L; + while (!stop.get()) { + index.addTablet(new LocalTablet(id++), tabletMeta, true); + // Keep the list bounded (and exercise the clear path) so the test stays fast. + if (index.getTablets().size() > 64) { + index.clearTabletsForRestore(); + } + } + }); + + Thread reader = new Thread(() -> { + try { + for (int i = 0; i < 50000 && error.get() == null; i++) { + for (Tablet tablet : index.getTablets()) { + tablet.getId(); + } + } + } catch (Throwable t) { + error.set(t); + } finally { + stop.set(true); + } + }); + + writer.start(); + reader.start(); + reader.join(); + stop.set(true); + writer.join(); + + if (error.get() != null) { + Assert.fail("getTablets() iteration threw under concurrent mutation: " + error.get()); + } + } + @Test public void testSerialization() throws Exception { // 1. Write objects to file diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java index eaafb31000e47c..2f89dcdeea8298 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java @@ -38,6 +38,9 @@ import java.io.DataOutputStream; import java.nio.file.Files; import java.nio.file.Path; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; public class TabletTest { @@ -118,6 +121,82 @@ public void deleteReplicaTest() { Assert.assertEquals(1, tablet.getReplicas().size()); } + @Test + public void testGetReplicasReturnsImmutableSnapshot() { + List snapshot = tablet.getReplicas(); + Assert.assertEquals(3, snapshot.size()); + + // A write after the snapshot was taken must not be visible in it (copy-on-write). + Replica replica4 = new LocalReplica(4L, 4L, 100L, 0, 200000L, 0, 3000L, ReplicaState.NORMAL, 0, 0); + tablet.addReplica(replica4); + Assert.assertEquals(3, snapshot.size()); + Assert.assertEquals(4, tablet.getReplicas().size()); + + // The returned snapshot is read-only. + Assert.assertThrows(UnsupportedOperationException.class, () -> snapshot.add(replica4)); + } + + @Test + public void testIterateReplicasWhileMutatingDoesNotThrow() { + // Iterating the snapshot returned by getReplicas() must not throw + // ConcurrentModificationException even when the tablet is structurally modified + // during iteration. + int seen = 0; + for (Replica r : tablet.getReplicas()) { + Assert.assertNotNull(r); + tablet.addReplica(new LocalReplica(100L + seen, 100L + seen, 100L, 0, 200000L, 0, 3000L, + ReplicaState.NORMAL, 0, 0)); + tablet.deleteReplicaByBackendId(2L); + seen++; + } + Assert.assertEquals(3, seen); + } + + @Test + public void testConcurrentGetReplicasNeverThrows() throws InterruptedException { + // A reader repeatedly snapshots and iterates getReplicas() while a writer keeps + // mutating the replica list. Copy-on-write guarantees the reader never observes a + // partially built list or throws ConcurrentModificationException. + AtomicReference error = new AtomicReference<>(); + AtomicBoolean stop = new AtomicBoolean(false); + + Thread writer = new Thread(() -> { + long id = 1000L; + while (!stop.get()) { + // Reuse a small set of backend ids so the list stays bounded while still + // exercising the add/replace path. + long beId = id % 8; + tablet.addReplica(new LocalReplica(id, beId, 100L, 0, 200000L, 0, 3000L, + ReplicaState.NORMAL, 0, 0), true); + id++; + } + }); + + Thread reader = new Thread(() -> { + try { + for (int i = 0; i < 50000 && error.get() == null; i++) { + for (Replica r : tablet.getReplicas()) { + r.getId(); + } + } + } catch (Throwable t) { + error.set(t); + } finally { + stop.set(true); + } + }); + + writer.start(); + reader.start(); + reader.join(); + stop.set(true); + writer.join(); + + if (error.get() != null) { + Assert.fail("getReplicas() iteration threw under concurrent mutation: " + error.get()); + } + } + @Test public void testSerialization() throws Exception { final Path path = Files.createTempFile("olapTabletTest", "tmp"); diff --git a/regression-test/suites/nonConcurrent/tablet_stat_mgr_p0/test_tablet_stat_mgr_concurrent_partition_churn.groovy b/regression-test/suites/nonConcurrent/tablet_stat_mgr_p0/test_tablet_stat_mgr_concurrent_partition_churn.groovy deleted file mode 100644 index 5267ce3271561c..00000000000000 --- a/regression-test/suites/nonConcurrent/tablet_stat_mgr_p0/test_tablet_stat_mgr_concurrent_partition_churn.groovy +++ /dev/null @@ -1,187 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -import java.io.RandomAccessFile -import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.atomic.AtomicReference - -suite("test_tablet_stat_mgr_concurrent_partition_churn", "nonConcurrent") { - String dbName = context.config.getDbNameByFile(context.file) - sql "select 1" - - String tableName = "test_tablet_stat_mgr_churn" - String oldInterval = null - AtomicBoolean stopped = new AtomicBoolean(false) - AtomicReference firstError = new AtomicReference<>() - // Track temp tables created by tableWorker so we can clean them up in finally. - AtomicInteger tableWorkerCount = new AtomicInteger(0) - - String dorisHome = System.getProperty("DORIS_HOME") - if (dorisHome == null || dorisHome.isEmpty()) { - dorisHome = context.config.suitePath.replace("/regression-test/suites", "") - } - - File feLog = new File("${dorisHome}/fe/log/fe.log") - File feWarnLog = new File("${dorisHome}/fe/log/fe.warn.log") - long feLogOffset = feLog.exists() ? feLog.length() : 0L - long feWarnLogOffset = feWarnLog.exists() ? feWarnLog.length() : 0L - - def readAppendedLog = { File file, long offset -> - if (!file.exists()) { - logger.warn("skip checking appended log because {} does not exist", file.getAbsolutePath()) - return "" - } - RandomAccessFile raf = new RandomAccessFile(file, "r") - try { - long safeOffset = Math.min(offset, raf.length()) - raf.seek(safeOffset) - int size = (int) (raf.length() - safeOffset) - if (size <= 0) { - return "" - } - byte[] bytes = new byte[size] - raf.readFully(bytes) - return new String(bytes, "UTF-8") - } finally { - raf.close() - } - } - - def failIfNeeded = { - if (firstError.get() != null) { - throw firstError.get() - } - } - - def startWorker = { Closure body -> - Thread.startDaemon { - try { - connect(context.config.jdbcUser, context.config.jdbcPassword, context.config.jdbcUrl) { - sql "use ${dbName}" - body() - } - } catch (Throwable t) { - logger.warn("tablet stat mgr concurrency worker failed", t) - firstError.compareAndSet(null, t) - stopped.set(true) - } - } - } - - try { - def configRow = sql """ ADMIN SHOW FRONTEND CONFIG LIKE 'tablet_stat_update_interval_second' """ - oldInterval = configRow[0][1] - sql """ ADMIN SET FRONTEND CONFIG ("tablet_stat_update_interval_second" = "1") """ - - sql """ DROP TABLE IF EXISTS ${tableName} FORCE """ - sql """ - CREATE TABLE ${tableName} ( - `k1` INT NOT NULL, - `k2` INT NOT NULL - ) - DUPLICATE KEY(`k1`) - PARTITION BY RANGE(`k1`) ( - PARTITION p0 VALUES LESS THAN ("100"), - PARTITION p1 VALUES LESS THAN ("200") - ) - DISTRIBUTED BY HASH(`k1`) BUCKETS 1 - PROPERTIES ( - "replication_num" = "1" - ) - """ - - sql """ INSERT INTO ${tableName} VALUES (1, 1), (101, 1) """ - - def insertWorker = startWorker { - int i = 0 - while (!stopped.get()) { - int left = i % 90 - int right = 100 + (i % 90) - sql """ INSERT INTO ${tableName} VALUES (${left}, ${i}), (${right}, ${i}) """ - i++ - } - } - - def partitionWorker = startWorker { - int i = 0 - while (!stopped.get()) { - String partitionName = "p_dyn_${i}" - int upperBound = 300 + i * 100 - sql """ ALTER TABLE ${tableName} ADD PARTITION ${partitionName} VALUES LESS THAN ("${upperBound}") """ - sql """ ALTER TABLE ${tableName} DROP PARTITION IF EXISTS ${partitionName} FORCE """ - i++ - } - } - - def tableWorker = startWorker { - int i = 0 - while (!stopped.get()) { - String tempTable = "tmp_tablet_stat_mgr_churn_${i}" - tableWorkerCount.set(i) - sql """ - CREATE TABLE ${tempTable} ( - `k1` INT NOT NULL, - `k2` INT NOT NULL - ) - DUPLICATE KEY(`k1`) - DISTRIBUTED BY HASH(`k1`) BUCKETS 1 - PROPERTIES ( - "replication_num" = "1" - ) - """ - sql """ DROP TABLE IF EXISTS ${tempTable} FORCE """ - i++ - } - } - - sleep(12000) - stopped.set(true) - [insertWorker, partitionWorker, tableWorker].each { Thread thread -> - thread.join(10000) - } - - failIfNeeded() - sql "sync" - def rowCount = sql """ SELECT count(*) FROM ${tableName} """ - assertTrue(rowCount[0][0].toLong() > 0L) - - // Wait for at least 2 more stat-mgr ticks (interval = 1 s) so that the daemon has a - // chance to run after the DDL storm and we can verify it completed successfully. - sleep(3000) - - // Positive assertion: stat manager must have successfully updated the table's stats. - // DataSize is non-zero only if at least one tick completed without crashing. - def showData = sql """ SHOW DATA FROM ${tableName} """ - assertTrue(showData != null && !showData.isEmpty(), - "SHOW DATA returned no rows — stat mgr may not have run") - - String appendedLogs = readAppendedLog(feLog, feLogOffset) + readAppendedLog(feWarnLog, feWarnLogOffset) - assertFalse(appendedLogs.contains("ConcurrentModificationException")) - assertFalse(appendedLogs.contains("daemon thread got exception. name: tablet stat mgr")) - } finally { - stopped.set(true) - if (oldInterval != null) { - sql """ ADMIN SET FRONTEND CONFIG ("tablet_stat_update_interval_second" = "${oldInterval}") """ - } - sql """ DROP TABLE IF EXISTS ${tableName} FORCE """ - // Clean up any orphaned temp tables left by tableWorker if it was interrupted mid-CREATE. - for (int i = 0; i <= tableWorkerCount.get(); i++) { - sql """ DROP TABLE IF EXISTS tmp_tablet_stat_mgr_churn_${i} FORCE """ - } - } -} From 925a4db8559eb8778b905f2d86da9c22f5742d3a Mon Sep 17 00:00:00 2001 From: morningman Date: Fri, 22 May 2026 11:13:18 -0700 Subject: [PATCH 6/7] [improvement] batch tablet publish on bulk creation paths MaterializedIndex.addTablet performed copy-on-write per tablet, making bulk paths (createTablets, schema change, MV rollup, restore) quadratic inside the synchronized block. Add appendTablets(Collection) that does one O(n) copy + one volatile publish per batch, and convert InternalCatalog.createTablets, CloudInternalCatalog.createCloudTablets, SchemaChangeHandler, MaterializedViewHandler, RestoreJob, CloudRestoreJob, and OlapTable's restore loop to use it. TabletInvertedIndex registration stays per-iteration so Tablet.addReplica still finds the tablet; only the per-index list copy moves out of the loop. Readers remain CME-safe. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../doris/alter/MaterializedViewHandler.java | 17 +++++++-- .../doris/alter/SchemaChangeHandler.java | 19 ++++++++-- .../org/apache/doris/backup/RestoreJob.java | 9 ++++- .../doris/catalog/MaterializedIndex.java | 38 +++++++++++++++---- .../org/apache/doris/catalog/OlapTable.java | 7 +++- .../doris/cloud/backup/CloudRestoreJob.java | 8 +++- .../datasource/CloudInternalCatalog.java | 12 +++++- .../doris/datasource/InternalCatalog.java | 14 ++++++- 8 files changed, 102 insertions(+), 22 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java index f875c807847f2a..700fad2f552694 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java @@ -443,14 +443,22 @@ private RollupJobV2 createMaterializedViewJob(String rawSql, String mvName, Stri MaterializedIndex mvIndex = new MaterializedIndex(mvIndexId, IndexState.SHADOW); MaterializedIndex baseIndex = partition.getIndex(baseIndexId); short replicationNum = olapTable.getPartitionInfo().getReplicaAllocation(partitionId).getTotalReplicaNum(); + // All MV tablets of the same (partition, mv index) share the same TabletMeta; + // build it once and bulk-publish to MaterializedIndex.tablets after the per-tablet + // loop to keep copy-on-write O(n). TabletInvertedIndex registration stays + // per-iteration because Tablet.addReplica(...) below needs the tablet present + // in the inverted index. + TabletMeta mvTabletMeta = new TabletMeta( + dbId, tableId, partitionId, mvIndexId, mvSchemaHash, medium); + List mvTabletsForPartition = Lists.newArrayListWithCapacity(baseIndex.getTablets().size()); + TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); for (Tablet baseTablet : baseIndex.getTablets()) { - TabletMeta mvTabletMeta = new TabletMeta( - dbId, tableId, partitionId, mvIndexId, mvSchemaHash, medium); long baseTabletId = baseTablet.getId(); long mvTabletId = idGeneratorBuffer.getNextId(); Tablet newTablet = EnvFactory.getInstance().createTablet(mvTabletId); - mvIndex.addTablet(newTablet, mvTabletMeta); + invertedIndex.addTablet(mvTabletId, mvTabletMeta); + mvTabletsForPartition.add(newTablet); addedTablets.add(newTablet); mvJob.addTabletIdMap(partitionId, mvTabletId, baseTabletId); @@ -505,6 +513,9 @@ private RollupJobV2 createMaterializedViewJob(String rawSql, String mvName, Stri } } // end for baseTablets + // Bulk-publish all MV tablets for this partition in one copy-on-write. + mvIndex.appendTablets(mvTabletsForPartition); + mvJob.addMVIndex(partitionId, mvIndex); if (LOG.isDebugEnabled()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 2a1c3fba031f24..c52dbcbba86966 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -54,6 +54,7 @@ import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.catalog.Tablet; +import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; import org.apache.doris.catalog.info.ColumnPosition; import org.apache.doris.catalog.info.IndexType; @@ -2050,14 +2051,23 @@ private void createJob(String rawSql, long dbId, OlapTable olapTable, Map shadowTabletsForPartition = Lists.newArrayListWithCapacity( + originIndex.getTablets().size()); + TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); for (Tablet originTablet : originIndex.getTablets()) { - TabletMeta shadowTabletMeta = new TabletMeta(dbId, tableId, partitionId, shadowIndexId, - newSchemaHash, medium); long originTabletId = originTablet.getId(); long shadowTabletId = idGeneratorBuffer.getNextId(); Tablet shadowTablet = EnvFactory.getInstance().createTablet(shadowTabletId); - shadowIndex.addTablet(shadowTablet, shadowTabletMeta); + invertedIndex.addTablet(shadowTabletId, shadowTabletMeta); + shadowTabletsForPartition.add(shadowTablet); addedTablets.add(shadowTablet); schemaChangeJob.addTabletIdMap(partitionId, shadowIndexId, shadowTabletId, originTabletId); @@ -2115,6 +2125,9 @@ private void createJob(String rawSql, long dbId, OlapTable olapTable, Map newTablets = new ArrayList<>(remotetabletSize); for (int i = 0; i < remotetabletSize; i++) { // generate new tablet id long newTabletId = env.getNextId(); Tablet newTablet = EnvFactory.getInstance().createTablet(newTabletId); - // add tablet to index, but not add to TabletInvertedIndex - remoteIdx.addTablet(newTablet, null /* tablet meta */, true /* is restore */); + newTablets.add(newTablet); // replicas try { Pair>, TStorageMedium> beIdsAndMedium = Env.getCurrentSystemInfo() @@ -1577,6 +1579,9 @@ protected Partition resetTabletForRestore(OlapTable localTbl, OlapTable remoteTb return null; } } + // add tablets to index in one batch; TabletInvertedIndex registration + // is intentionally skipped on the restore path (rebuilt separately). + remoteIdx.appendTablets(newTablets); } return remotePart; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java index 9842d673564c8b..3fe55ee56eab26 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java @@ -22,6 +22,7 @@ import com.google.gson.annotations.SerializedName; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -123,19 +124,42 @@ public synchronized void addTablet(Tablet tablet, TabletMeta tabletMeta) { // Writers are synchronized on this index to prevent concurrent lost-update: // some callers (e.g. InternalCatalog.createTablets) do NOT hold the OlapTable // write lock when adding tablets. - // Copy-on-write makes incremental add O(n); bulk creation is thus O(n^2), but n - // (bucket count) is small and creation cost is dominated by replica/RPC work — the - // copy is negligible, and CME-safe reads on the hot query path are worth it. + // Copy-on-write keeps readers CME-safe without locking; for bulk creation use + // appendTablets(...) so the per-index tablets list is copied once per batch + // instead of once per tablet. public synchronized void addTablet(Tablet tablet, TabletMeta tabletMeta, boolean isRestore) { - idToTablets.put(tablet.getId(), tablet); - List next = new ArrayList<>(tablets); - next.add(tablet); - tablets = next; // volatile write; readers see the new immutable snapshot + appendTabletsInternal(Collections.singletonList(tablet)); if (!isRestore) { Env.getCurrentInvertedIndex().addTablet(tablet.getId(), tabletMeta); } } + // Bulk-publish: append the given tablets to this index's tablets list in a + // single copy-on-write (O(existing + batch) instead of O(n^2) over n + // single-tablet adds inside a synchronized block). + // + // Does NOT touch TabletInvertedIndex. Bulk-creation callers register tablets + // in TabletInvertedIndex eagerly inside their per-tablet loop because + // Tablet.addReplica(...) (non-restore) requires the tablet to already be + // present in the inverted index; only the per-index list copy is expensive + // enough to be worth batching. + public synchronized void appendTablets(Collection newTablets) { + appendTabletsInternal(newTablets); + } + + private void appendTabletsInternal(Collection newTablets) { + if (newTablets.isEmpty()) { + return; + } + List next = new ArrayList<>(tablets.size() + newTablets.size()); + next.addAll(tablets); + for (Tablet tablet : newTablets) { + idToTablets.put(tablet.getId(), tablet); + next.add(tablet); + } + tablets = next; // single volatile write publishes the whole batch + } + public void setIdForRestore(long idxId) { this.id = idxId; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 2482a0514e4a87..38cd24c9e2a450 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -972,10 +972,12 @@ && getTableProperty().getDynamicPartitionProperty().getBuckets() // generate new tablets in origin tablet order int tabletNum = idx.getTablets().size(); idx.clearTabletsForRestore(); + // Collect locally and bulk-publish to keep copy-on-write O(n) for the whole index. + List newTablets = new ArrayList<>(tabletNum); for (int i = 0; i < tabletNum; i++) { long newTabletId = env.getNextId(); Tablet newTablet = EnvFactory.getInstance().createTablet(newTabletId); - idx.addTablet(newTablet, null /* tablet meta */, true /* is restore */); + newTablets.add(newTablet); // replicas if (Config.isCloudMode()) { long newReplicaId = Env.getCurrentEnv().getNextId(); @@ -1015,6 +1017,9 @@ && getTableProperty().getDynamicPartitionProperty().getBuckets() return new Status(ErrCode.COMMON_ERROR, e.getMessage()); } } + // add tablets to index in one batch; TabletInvertedIndex registration + // is intentionally skipped on the restore path (rebuilt separately). + idx.appendTablets(newTablets); } if (createNewColocateGroup) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/backup/CloudRestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/backup/CloudRestoreJob.java index f47695adecf6e1..a9e6469a6d91bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/backup/CloudRestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/backup/CloudRestoreJob.java @@ -333,18 +333,22 @@ public Partition resetTabletForRestore(OlapTable localTbl, OlapTable remoteTbl, int schemaHash = remoteTbl.getSchemaHashByIndexId(remoteIdx.getId()); int remotetabletSize = remoteIdx.getTablets().size(); remoteIdx.clearTabletsForRestore(); + // Collect locally and bulk-publish to keep copy-on-write O(n) for the whole index. + List newTablets = new ArrayList<>(remotetabletSize); for (int i = 0; i < remotetabletSize; i++) { // generate new tablet id long newTabletId = env.getNextId(); Tablet newTablet = EnvFactory.getInstance().createTablet(newTabletId); - // add tablet to index, but not add to TabletInvertedIndex - remoteIdx.addTablet(newTablet, null /* tablet meta */, true /* is restore */); + newTablets.add(newTablet); // replicas long newReplicaId = Env.getCurrentEnv().getNextId(); Replica replica = new CloudReplica(newReplicaId, null, Replica.ReplicaState.NORMAL, visibleVersion, schemaHash, dbId, localTbl.getId(), partitionId, remoteIdx.getId(), i); newTablet.addReplica(replica, true /* is restore */); } + // add tablets to index in one batch; TabletInvertedIndex registration + // is intentionally skipped on the restore path (rebuilt separately). + remoteIdx.appendTablets(newTablets); } return remotePart; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java index 5bf096bc60e0cd..cabc936772df06 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java @@ -41,6 +41,7 @@ import org.apache.doris.catalog.ReplicaAllocation; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.Tablet; +import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; import org.apache.doris.cloud.catalog.CloudEnv; import org.apache.doris.cloud.catalog.CloudPartition; @@ -82,6 +83,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -448,11 +450,16 @@ private OlapFile.RowsetMetaCloudPB.Builder createInitialRowset(Tablet tablet, lo private void createCloudTablets(MaterializedIndex index, ReplicaState replicaState, DistributionInfo distributionInfo, long version, ReplicaAllocation replicaAlloc, TabletMeta tabletMeta, Set tabletIdSet) throws DdlException { + // Collect bucket tablets locally and bulk-publish to the MaterializedIndex's + // tablets list in a single copy-on-write after the loop (see + // InternalCatalog.createTablets for rationale). + TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); + List bucketTablets = new ArrayList<>(distributionInfo.getBucketNum()); for (int i = 0; i < distributionInfo.getBucketNum(); ++i) { Tablet tablet = EnvFactory.getInstance().createTablet(Env.getCurrentEnv().getNextId()); - // add tablet to inverted index first - index.addTablet(tablet, tabletMeta); + invertedIndex.addTablet(tablet.getId(), tabletMeta); + bucketTablets.add(tablet); tabletIdSet.add(tablet.getId()); long replicaId = Env.getCurrentEnv().getNextId(); @@ -461,6 +468,7 @@ private void createCloudTablets(MaterializedIndex index, ReplicaState replicaSta tabletMeta.getPartitionId(), tabletMeta.getIndexId(), i); tablet.addReplica(replica); } + index.appendTablets(bucketTablets); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index a445fd31fd4e05..15f78c23ff4fef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -3419,12 +3419,19 @@ public TStorageMedium createTablets(MaterializedIndex index, ReplicaState replic } } + // Collect bucket tablets locally and bulk-publish to the MaterializedIndex's + // tablets list in a single copy-on-write after the loop (O(bucketNum) instead + // of O(bucketNum^2)). TabletInvertedIndex registration stays per-iteration + // because Tablet.addReplica(...) below needs the tablet present in the + // inverted index. + TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); + List bucketTablets = new ArrayList<>(distributionInfo.getBucketNum()); for (int i = 0; i < distributionInfo.getBucketNum(); ++i) { // create a new tablet with random chosen backends Tablet tablet = EnvFactory.getInstance().createTablet(idGeneratorBuffer.getNextId()); - // add tablet to inverted index first - index.addTablet(tablet, tabletMeta); + invertedIndex.addTablet(tablet.getId(), tabletMeta); + bucketTablets.add(tablet); tabletIdSet.add(tablet.getId()); // get BackendIds @@ -3464,6 +3471,9 @@ public TStorageMedium createTablets(MaterializedIndex index, ReplicaState replic totalReplicaNum + " vs. " + replicaAlloc.getTotalReplicaNum()); } + // Publish all bucket tablets to the materialized index in one batch. + index.appendTablets(bucketTablets); + if (groupId != null && chooseBackendsArbitrary) { colocateIndex.addBackendsPerBucketSeq(groupId, backendsPerBucketSeq); ColocatePersistInfo info = ColocatePersistInfo.createForBackendsPerBucketSeq(groupId, backendsPerBucketSeq); From 676d21ebfe1314d23e8d66f584ae8946d15ecc8d Mon Sep 17 00:00:00 2001 From: morningman Date: Fri, 22 May 2026 11:53:05 -0700 Subject: [PATCH 7/7] [improvement] close remaining unsynchronized reads in tablet/replica metadata MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address two follow-up review notes on the COW snapshot work: 1. MaterializedIndex.idToTablets was still a plain HashMap mutated under synchronized writers but read lock-free by getTablet(id) on scheduler, report, proc and other paths. Publish it as a volatile immutable snapshot in lockstep with the tablets list. Ordering preserves the tablets ⊆ idToTablets invariant: append publishes the map before the list (any id visible via iteration is queryable), clearTabletsForRestore drops the list before the map. 2. LocalTablet had readers re-reading the volatile `replicas` field more than once inside a single method (getRemoteDataSize, equals, readyToBeRepaired), defeating the COW snapshot guarantee. Capture one local snapshot at the top of each method and use it throughout. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../org/apache/doris/catalog/LocalTablet.java | 21 +++++++---- .../doris/catalog/MaterializedIndex.java | 37 ++++++++++++++----- 2 files changed, 40 insertions(+), 18 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java index 8b0bbff5131f57..5db0a1286b5d2f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java @@ -109,13 +109,14 @@ public long getRemoteDataSize() { if (cooldownReplicaId <= 0) { return 0; } - for (Replica r : replicas) { + List snapshot = replicas; // single volatile read; reuse below + for (Replica r : snapshot) { if (r.getId() == cooldownReplicaId) { return r.getRemoteDataSize(); } } // return replica with max remoteDataSize - return replicas.stream().max(Comparator.comparing(Replica::getRemoteDataSize)).get().getRemoteDataSize(); + return snapshot.stream().max(Comparator.comparing(Replica::getRemoteDataSize)).get().getRemoteDataSize(); } @Override @@ -263,7 +264,7 @@ public List getReplicas() { @Override public Replica getReplicaByBackendId(long backendId) { - for (Replica replica : replicas) { + for (Replica replica : replicas) { // single volatile read if (replica.getBackendIdWithoutException() == backendId) { return replica; } @@ -315,13 +316,17 @@ public boolean equals(Object obj) { LocalTablet tablet = (LocalTablet) obj; - if (replicas != tablet.replicas) { - if (replicas.size() != tablet.replicas.size()) { + // Capture one snapshot per side so a concurrent writer cannot publish + // a different list between size/contains/get calls below. + List thisReplicas = replicas; + List otherReplicas = tablet.replicas; + if (thisReplicas != otherReplicas) { + if (thisReplicas.size() != otherReplicas.size()) { return false; } - int size = replicas.size(); + int size = thisReplicas.size(); for (int i = 0; i < size; i++) { - if (!tablet.replicas.contains(replicas.get(i))) { + if (!otherReplicas.contains(thisReplicas.get(i))) { return false; } } @@ -347,7 +352,7 @@ public boolean readyToBeRepaired(SystemInfoService infoService, TabletSchedCtx.P } boolean allBeAliveOrDecommissioned = true; - for (Replica replica : replicas) { + for (Replica replica : replicas) { // single volatile read; iteration on the snapshot Backend backend = infoService.getBackend(replica.getBackendIdWithoutException()); if (backend == null || (!backend.isAlive() && !backend.isDecommissioned())) { allBeAliveOrDecommissioned = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java index 3fe55ee56eab26..f84ff018ef6420 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java @@ -58,7 +58,13 @@ public enum IndexExtState { @SerializedName(value = "rowCount") private long rowCount; - private Map idToTablets; + // Published as a volatile immutable snapshot in lockstep with `tablets`. + // Writers (synchronized) build a fresh HashMap and assign the field; readers + // capture the reference once and call get/containsKey on the snapshot. + // Invariant: `tablets ⊆ idToTablets` — any tablet visible in the list is also + // present in the map. This is preserved by publishing the map BEFORE the list + // on add and the list BEFORE the map on clear. + private volatile Map idToTablets; @SerializedName(value = "tablets") // this is for keeping tablet order private volatile List tablets; @@ -109,12 +115,15 @@ public List getTabletIdsInOrder() { } public Tablet getTablet(long tabletId) { + // Single volatile read of the immutable map snapshot. return idToTablets.get(tabletId); } public synchronized void clearTabletsForRestore() { - idToTablets.clear(); - tablets = new ArrayList<>(); // volatile write + // Drop the list first so iteration stops seeing tablets before + // lookup-by-id drops them. Maintains tablets ⊆ idToTablets. + tablets = new ArrayList<>(); + idToTablets = new HashMap<>(); } public synchronized void addTablet(Tablet tablet, TabletMeta tabletMeta) { @@ -151,13 +160,17 @@ private void appendTabletsInternal(Collection newTablets) { if (newTablets.isEmpty()) { return; } - List next = new ArrayList<>(tablets.size() + newTablets.size()); - next.addAll(tablets); + Map nextMap = new HashMap<>(idToTablets); + List nextList = new ArrayList<>(tablets.size() + newTablets.size()); + nextList.addAll(tablets); for (Tablet tablet : newTablets) { - idToTablets.put(tablet.getId(), tablet); - next.add(tablet); + nextMap.put(tablet.getId(), tablet); + nextList.add(tablet); } - tablets = next; // single volatile write publishes the whole batch + // Publish the map first, then the list — so any id that appears in the + // visible `tablets` snapshot is already present in `idToTablets`. + idToTablets = nextMap; + tablets = nextList; } public void setIdForRestore(long idxId) { @@ -336,9 +349,13 @@ public String toString() { @Override public void gsonPostProcess() { - // build "idToTablets" from "tablets" + // Build a fresh "idToTablets" snapshot from the deserialized "tablets" list. + // Runs single-threaded during gson deserialization, before any concurrent + // reader can observe this object. + Map map = new HashMap<>(tablets.size()); for (Tablet tablet : tablets) { - idToTablets.put(tablet.getId(), tablet); + map.put(tablet.getId(), tablet); } + idToTablets = map; } }