Skip to content

Commit 1edc29a

Browse files
authored
[BugFix] Fix mv tablet meta inconsistent between FE leader and follower (StarRocks#69428)
Signed-off-by: shuming.li <ming.moriarty@gmail.com>
1 parent 4e6e2cb commit 1edc29a

File tree

5 files changed

+285
-21
lines changed

5 files changed

+285
-21
lines changed

fe/fe-core/src/main/java/com/starrocks/catalog/MaterializedView.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1219,6 +1219,9 @@ public void dropPartition(long dbId, String partitionName, boolean isForceDrop)
12191219
// if mv's partition is dropped, we need to update mv's timeliness.
12201220
GlobalStateMgr.getCurrentState().getMaterializedViewMgr()
12211221
.triggerTimelessInfoEvent(this, MVTimelinessMgr.MVChangeEvent.MV_PARTITION_DROPPED);
1222+
1223+
// Update schema update time to invalidate cached plans
1224+
lastSchemaUpdateTime.set(System.nanoTime());
12221225
}
12231226

12241227
@Override

fe/fe-core/src/main/java/com/starrocks/planner/OlapScanNode.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1585,8 +1585,17 @@ private Map<Long, List<Long>> mapTabletsToPartitions() {
15851585

15861586
for (Long partitionId : selectedPartitionIds) {
15871587
Partition partition = olapTable.getPartition(partitionId);
1588+
if (partition == null) {
1589+
throw new RuntimeException(String.format("Partition %d not found in table %s. ",
1590+
partitionId, olapTable.getName()));
1591+
}
15881592
for (PhysicalPartition physicalPartition : partition.getSubPartitions()) {
15891593
MaterializedIndex materializedIndex = physicalPartition.getLatestIndex(index.indexMetaId);
1594+
if (materializedIndex == null) {
1595+
throw new RuntimeException(String.format("Materialized index with meta id %d " +
1596+
"not found in partition %s (physical partition id: %d) of table %s. ",
1597+
index.indexMetaId, partition.getName(), physicalPartition.getId(), olapTable.getName()));
1598+
}
15901599
for (long tabletId : materializedIndex.getTabletIdsInOrder()) {
15911600
tabletToPartitionMap.put(tabletId, physicalPartition.getId());
15921601
}
@@ -1596,8 +1605,11 @@ private Map<Long, List<Long>> mapTabletsToPartitions() {
15961605
for (Long tabletId : scanTabletIds) {
15971606
// for query: select count(1) from t tablet(tablet_id0, tablet_id1,...), the user-provided tablet_id
15981607
// maybe invalid.
1599-
Preconditions.checkState(tabletToPartitionMap.containsKey(tabletId),
1600-
"Invalid tablet id: '" + tabletId + "'");
1608+
if (!tabletToPartitionMap.containsKey(tabletId)) {
1609+
throw new RuntimeException(String.format("Invalid tablet id: '%d'. The tablet may have been dropped. " +
1610+
"Selected partitions: %s, Table: %s, IndexMetaId: %d. ",
1611+
tabletId, selectedPartitionIds, olapTable.getName(), index.indexMetaId));
1612+
}
16011613
long partitionId = tabletToPartitionMap.get(tabletId);
16021614
partitionToTabletMap.computeIfAbsent(partitionId, k -> Lists.newArrayList()).add(tabletId);
16031615
}

fe/fe-core/src/main/java/com/starrocks/scheduler/mv/BaseMVRefreshProcessor.java

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,9 @@
2323
import com.google.common.util.concurrent.Uninterruptibles;
2424
import com.starrocks.catalog.BaseTableInfo;
2525
import com.starrocks.catalog.Column;
26-
import com.starrocks.catalog.DataProperty;
2726
import com.starrocks.catalog.Database;
2827
import com.starrocks.catalog.MaterializedView;
2928
import com.starrocks.catalog.OlapTable;
30-
import com.starrocks.catalog.Partition;
3129
import com.starrocks.catalog.PartitionInfo;
3230
import com.starrocks.catalog.ResourceGroup;
3331
import com.starrocks.catalog.Table;
@@ -62,7 +60,6 @@
6260
import com.starrocks.scheduler.persist.MVTaskRunExtraMessage;
6361
import com.starrocks.scheduler.persist.TaskRunStatus;
6462
import com.starrocks.server.GlobalStateMgr;
65-
import com.starrocks.server.LocalMetastore;
6663
import com.starrocks.sql.analyzer.MaterializedViewAnalyzer;
6764
import com.starrocks.sql.analyzer.SemanticException;
6865
import com.starrocks.sql.ast.InsertStmt;
@@ -413,28 +410,17 @@ protected boolean syncPartitions() throws AnalysisException, LockTimeoutExceptio
413410
throw new DmlException("Force refresh failed, database:" + db.getFullName() + " not exist");
414411
}
415412
try {
416-
PartitionInfo partitionInfo = mv.getPartitionInfo();
417-
DataProperty dataProperty = null;
418-
if (!mv.isPartitionedTable()) {
419-
String partitionName = toRefreshPartitions.getPartitions().iterator().next().name();
420-
Partition partition = mv.getPartition(partitionName);
421-
dataProperty = partitionInfo.getDataProperty(partition.getId());
422-
mv.dropPartition(db.getId(), partitionName, true /* forceDrop */);
413+
// for non-partitioned MVs, or for complete refresh of partitioned MVs, just clear the visible
414+
// version map directly since all partitions will be refreshed.
415+
if (!mv.isPartitionedTable() || mvRefreshParams.isCompleteRefresh()) {
416+
mv.getRefreshScheme().getAsyncRefreshContext().clearVisibleVersionMap();
423417
} else {
424418
for (PCellWithName partName : toRefreshPartitions.getPartitions()) {
425419
mvRefreshPartitioner.dropPartition(db, mv, partName.name());
426420
}
427421
}
428-
429-
// for non-partitioned table, we need to build the partition here
430-
if (!mv.isPartitionedTable()) {
431-
LocalMetastore localMetastore = GlobalStateMgr.getCurrentState().getLocalMetastore();
432-
ConnectContext connectContext = mvContext.getCtx();
433-
localMetastore.buildNonPartitionOlapTable(db, mv, partitionInfo, dataProperty,
434-
connectContext.getCurrentComputeResource());
435-
}
436422
} catch (Exception e) {
437-
logger.warn("failed to drop partitions {} for force refresh",
423+
logger.warn("failed to drop partitions or clear version map {} for force refresh",
438424
Joiner.on(",").join(toRefreshPartitions.getPartitionNames()),
439425
DebugUtil.getRootStackTrace(e));
440426
throw new AnalysisException("failed to drop partitions for force refresh: " + e.getMessage());

fe/fe-core/src/test/java/com/starrocks/catalog/MaterializedViewTest.java

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1461,4 +1461,117 @@ public void testAsyncReloadSkipsWhenAlreadyReloaded() throws Exception {
14611461
Assertions.assertEquals(initialReloadState, mv.getReloadState(),
14621462
"Async onReload should skip when already reloaded");
14631463
}
1464+
1465+
/**
1466+
* Test that dropping a partition updates the lastSchemaUpdateTime to invalidate cached plans.
1467+
* This ensures that when a partition is dropped, any cached plans referencing the MV are invalidated.
1468+
*/
1469+
@Test
1470+
public void testDropPartitionUpdatesSchemaUpdateTime() throws Exception {
1471+
starRocksAssert.withDatabase("test").useDatabase("test")
1472+
.withTable("CREATE TABLE base_table_drop_partition\n" +
1473+
"(\n" +
1474+
" k1 date,\n" +
1475+
" k2 int,\n" +
1476+
" v1 int sum\n" +
1477+
")\n" +
1478+
"PARTITION BY RANGE(k1)\n" +
1479+
"(\n" +
1480+
" PARTITION p1 values [('2022-02-01'),('2022-02-16')),\n" +
1481+
" PARTITION p2 values [('2022-02-16'),('2022-03-01'))\n" +
1482+
")\n" +
1483+
"DISTRIBUTED BY HASH(k2) BUCKETS 3\n" +
1484+
"PROPERTIES('replication_num' = '1');")
1485+
.withMaterializedView("CREATE MATERIALIZED VIEW test_mv_drop_partition\n" +
1486+
"PARTITION BY k1\n" +
1487+
"DISTRIBUTED BY HASH(k2) BUCKETS 3\n" +
1488+
"REFRESH manual\n" +
1489+
"as select k1,k2,v1 from base_table_drop_partition;");
1490+
1491+
Database testDb = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb("test");
1492+
MaterializedView mv = ((MaterializedView) GlobalStateMgr.getCurrentState().getLocalMetastore()
1493+
.getTable(testDb.getFullName(), "test_mv_drop_partition"));
1494+
1495+
// Wait for initial reload
1496+
mv.waitForReloaded();
1497+
Assertions.assertTrue(mv.hasReloaded());
1498+
1499+
// Get the initial schema update time
1500+
long initialSchemaUpdateTime = mv.lastSchemaUpdateTime.get();
1501+
1502+
// Wait a small amount to ensure time progresses
1503+
Thread.sleep(10);
1504+
1505+
// Record time before drop partition
1506+
long beforeDropTime = System.nanoTime();
1507+
1508+
// Drop a partition
1509+
mv.dropPartition(testDb.getId(), "p1", false);
1510+
1511+
// Verify the partition was dropped
1512+
Assertions.assertNull(mv.getPartition("p1"), "Partition p1 should be dropped");
1513+
1514+
// Verify lastSchemaUpdateTime was updated
1515+
long afterDropSchemaUpdateTime = mv.lastSchemaUpdateTime.get();
1516+
Assertions.assertTrue(afterDropSchemaUpdateTime > initialSchemaUpdateTime,
1517+
"lastSchemaUpdateTime should be updated after dropping partition");
1518+
Assertions.assertTrue(afterDropSchemaUpdateTime >= beforeDropTime,
1519+
"lastSchemaUpdateTime should be >= time before drop");
1520+
}
1521+
1522+
/**
1523+
* Test that force dropping a partition also updates the lastSchemaUpdateTime.
1524+
*/
1525+
@Test
1526+
public void testForceDropPartitionUpdatesSchemaUpdateTime() throws Exception {
1527+
starRocksAssert.withDatabase("test").useDatabase("test")
1528+
.withTable("CREATE TABLE base_table_force_drop\n" +
1529+
"(\n" +
1530+
" k1 date,\n" +
1531+
" k2 int,\n" +
1532+
" v1 int sum\n" +
1533+
")\n" +
1534+
"PARTITION BY RANGE(k1)\n" +
1535+
"(\n" +
1536+
" PARTITION p1 values [('2022-02-01'),('2022-02-16')),\n" +
1537+
" PARTITION p2 values [('2022-02-16'),('2022-03-01'))\n" +
1538+
")\n" +
1539+
"DISTRIBUTED BY HASH(k2) BUCKETS 3\n" +
1540+
"PROPERTIES('replication_num' = '1');")
1541+
.withMaterializedView("CREATE MATERIALIZED VIEW test_mv_force_drop\n" +
1542+
"PARTITION BY k1\n" +
1543+
"DISTRIBUTED BY HASH(k2) BUCKETS 3\n" +
1544+
"REFRESH manual\n" +
1545+
"as select k1,k2,v1 from base_table_force_drop;");
1546+
1547+
Database testDb = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb("test");
1548+
MaterializedView mv = ((MaterializedView) GlobalStateMgr.getCurrentState().getLocalMetastore()
1549+
.getTable(testDb.getFullName(), "test_mv_force_drop"));
1550+
1551+
// Wait for initial reload
1552+
mv.waitForReloaded();
1553+
Assertions.assertTrue(mv.hasReloaded());
1554+
1555+
// Get the initial schema update time
1556+
long initialSchemaUpdateTime = mv.lastSchemaUpdateTime.get();
1557+
1558+
// Wait a small amount to ensure time progresses
1559+
Thread.sleep(10);
1560+
1561+
// Record time before drop partition
1562+
long beforeDropTime = System.nanoTime();
1563+
1564+
// Force drop a partition
1565+
mv.dropPartition(testDb.getId(), "p1", true);
1566+
1567+
// Verify the partition was dropped
1568+
Assertions.assertNull(mv.getPartition("p1"), "Partition p1 should be force dropped");
1569+
1570+
// Verify lastSchemaUpdateTime was updated
1571+
long afterDropSchemaUpdateTime = mv.lastSchemaUpdateTime.get();
1572+
Assertions.assertTrue(afterDropSchemaUpdateTime > initialSchemaUpdateTime,
1573+
"lastSchemaUpdateTime should be updated after force dropping partition");
1574+
Assertions.assertTrue(afterDropSchemaUpdateTime >= beforeDropTime,
1575+
"lastSchemaUpdateTime should be >= time before force drop");
1576+
}
14641577
}

0 commit comments

Comments
 (0)