Skip to content

Commit d151969

Browse files
committed
[fix](cloud)Fix create dynamic table race with insert overwrite (apache#59489)
1. Thread 1 executed a DROP TABLE operation, followed by a CREATE TABLE operation, initializing a dynamic partition (Partition A) in memory. 2. Due to an optimization in the cloud, batch partition edit logs were attempted; however, the edit log for Partition A was not yet synchronized to the followers. 3. Thread 2 began an INSERT INTO ... OVERWRITE PARTITION (*) operation, identifying Partition A in memory and writing a replace partition edit log. 4. In follower, the replay thread attempted to replay the replace partition edit log but found a dependency on Partition A, which was absent from memory, resulting in an exception and subsequent termination. Fix ``` 2025-12-30 16:10:42,774 ERROR (replayer|123) [EditLog.loadJournal():1445] replay Operation Type 210, log id: 1910 java.lang.NullPointerException at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:906) at org.apache.doris.catalog.OlapTable.checkPartition(OlapTable.java:2835) at org.apache.doris.catalog.OlapTable.replaceTempPartitions(OlapTable.java:2799) at org.apache.doris.catalog.Env.replayReplaceTempPartition(Env.java:6750) at org.apache.doris.persist.EditLog.loadJournal(EditLog.java:986) at org.apache.doris.catalog.Env.replayJournal(Env.java:3100) at org.apache.doris.catalog.Env$4.runOneCycle(Env.java:2862) at org.apache.doris.common.util.Daemon.run(Daemon.java:119) 2025-12-30 16:10:42,775 INFO (Thread-0|32) [DorisFE.lambda$start$0():159] Received shutdown signal, starting graceful shutdown... 2025-12-30 16:10:42,776 INFO (Thread-0|32) [DorisFE.gracefulShutdown():639] graceful shutdown finished ``` Issue Number: close #xxx Related PR: #xxx Problem Summary: None - Test <!-- At least one of them must be included. --> - [x] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [x] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [x] No. - [ ] Yes. <!-- Add document PR link here. eg: apache/doris-website#1214 --> - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into -->
1 parent 7f5ba43 commit d151969

File tree

9 files changed

+187
-47
lines changed

9 files changed

+187
-47
lines changed

fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -609,7 +609,7 @@ public void processAlterTable(AlterTableStmt stmt) throws UserException {
609609
DynamicPartitionUtil.checkAlterAllowed(
610610
(OlapTable) db.getTableOrMetaException(tableName, TableType.OLAP));
611611
}
612-
Env.getCurrentEnv().addPartition(db, tableName, (AddPartitionClause) alterClause, false, 0, true);
612+
Env.getCurrentEnv().addPartition(db, tableName, (AddPartitionClause) alterClause, false, 0, true, null);
613613
} else if (alterClause instanceof AddPartitionLikeClause) {
614614
if (!((AddPartitionLikeClause) alterClause).getIsTempPartition()) {
615615
DynamicPartitionUtil.checkAlterAllowed(

fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3487,14 +3487,17 @@ public void createTableAsSelect(CreateTableAsSelectStmt stmt) throws DdlExceptio
34873487
* @param isCreateTable this call is for creating table
34883488
* @param generatedPartitionId the preset partition id for the partition to add
34893489
* @param writeEditLog whether to write an edit log for this addition
3490-
* @return PartitionPersistInfo to be written to editlog. It may be null if no partitions added.
3490+
* @batchPartitions output parameter, used to batch write edit log outside this function, can be null.
3491+
* first is editlog PartitionPersistInfo, second is the added Partition
34913492
* @throws DdlException
34923493
*/
3493-
public PartitionPersistInfo addPartition(Database db, String tableName, AddPartitionClause addPartitionClause,
3494+
public void addPartition(Database db, String tableName, AddPartitionClause addPartitionClause,
34943495
boolean isCreateTable, long generatedPartitionId,
3495-
boolean writeEditLog) throws DdlException {
3496-
return getInternalCatalog().addPartition(db, tableName, addPartitionClause,
3497-
isCreateTable, generatedPartitionId, writeEditLog);
3496+
boolean writeEditLog,
3497+
List<Pair<PartitionPersistInfo, Partition>> batchPartitions)
3498+
throws DdlException {
3499+
getInternalCatalog().addPartition(db, tableName, addPartitionClause,
3500+
isCreateTable, generatedPartitionId, writeEditLog, batchPartitions);
34983501
}
34993502

35003503
public void addMultiPartitions(Database db, String tableName, AlterMultiPartitionClause multiPartitionClause)

fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java

Lines changed: 49 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -712,7 +712,7 @@ public void executeDynamicPartition(Collection<Pair<Long, Long>> dynamicPartitio
712712
cloudBatchBeforeCreatePartitions(executeFirstTime, addPartitionClauses, olapTable, indexIds,
713713
db, tableName, generatedPartitionIds);
714714

715-
List<PartitionPersistInfo> partsInfo = new ArrayList<>();
715+
List<Pair<PartitionPersistInfo, Partition>> batchPartsInfo = new ArrayList<>();
716716
for (int i = 0; i < addPartitionClauses.size(); i++) {
717717
try {
718718
boolean needWriteEditLog = true;
@@ -721,15 +721,10 @@ public void executeDynamicPartition(Collection<Pair<Long, Long>> dynamicPartitio
721721
if (Config.isCloudMode()) {
722722
needWriteEditLog = !executeFirstTime;
723723
}
724-
PartitionPersistInfo info =
725-
Env.getCurrentEnv().addPartition(db, tableName, addPartitionClauses.get(i),
724+
Env.getCurrentEnv().addPartition(db, tableName, addPartitionClauses.get(i),
726725
executeFirstTime,
727726
executeFirstTime && Config.isCloudMode() ? generatedPartitionIds.get(i) : 0,
728-
needWriteEditLog);
729-
if (info == null) {
730-
throw new Exception("null persisted partition returned");
731-
}
732-
partsInfo.add(info);
727+
needWriteEditLog, batchPartsInfo);
733728
clearCreatePartitionFailedMsg(olapTable.getId());
734729
} catch (Exception e) {
735730
recordCreatePartitionFailedMsg(db.getFullName(), tableName, e.getMessage(), olapTable.getId());
@@ -739,7 +734,7 @@ public void executeDynamicPartition(Collection<Pair<Long, Long>> dynamicPartitio
739734
}
740735
}
741736
}
742-
cloudBatchAfterCreatePartitions(executeFirstTime, partsInfo,
737+
cloudBatchAfterCreatePartitions(executeFirstTime, batchPartsInfo,
743738
addPartitionClauses, db, olapTable, indexIds, tableName);
744739

745740
// ATTN: Breaking up dynamic partition table scheduling, consuming peak CPU consumption
@@ -759,15 +754,16 @@ public void executeDynamicPartition(Collection<Pair<Long, Long>> dynamicPartitio
759754
}
760755
}
761756

762-
private void cloudBatchAfterCreatePartitions(boolean executeFirstTime, List<PartitionPersistInfo> partsInfo,
763-
ArrayList<AddPartitionClause> addPartitionClauses, Database db,
764-
OlapTable olapTable, List<Long> indexIds,
765-
String tableName) throws DdlException {
757+
private void cloudBatchAfterCreatePartitions(boolean executeFirstTime,
758+
List<Pair<PartitionPersistInfo, Partition>> batchPartsInfo,
759+
ArrayList<AddPartitionClause> addPartitionClauses, Database db,
760+
OlapTable olapTable, List<Long> indexIds,
761+
String tableName) throws DdlException {
766762
if (Config.isNotCloudMode()) {
767763
return;
768764
}
769-
List<Long> succeedPartitionIds = partsInfo.stream().map(partitionPersistInfo
770-
-> partitionPersistInfo.getPartition().getId()).collect(Collectors.toList());
765+
List<Long> succeedPartitionIds = batchPartsInfo.stream().map(partitionInfo
766+
-> partitionInfo.first.getPartition().getId()).collect(Collectors.toList());
771767
if (!executeFirstTime || addPartitionClauses.isEmpty()) {
772768
LOG.info("cloud commit rpc in batch, {}-{}", !executeFirstTime, addPartitionClauses.size());
773769
return;
@@ -784,28 +780,56 @@ private void cloudBatchAfterCreatePartitions(boolean executeFirstTime, List<Part
784780
succeedPartitionIds, indexIds, true);
785781
LOG.info("begin write edit log to add partitions in batch, "
786782
+ "numPartitions: {}, db: {}, table: {}, tableId: {}",
787-
partsInfo.size(), db.getFullName(), tableName, olapTable.getId());
783+
batchPartsInfo.size(), db.getFullName(), tableName, olapTable.getId());
788784
// ATTN: here, edit log must after commit cloud partition,
789785
// prevent commit RPC failure from causing data loss
790786
if (DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.before.logEditPartitions")) {
791787
LOG.info("debug point FE.DynamicPartitionScheduler.before.logEditPartitions, throw e");
792788
// committed, but not log edit
793789
throw new Exception("debug point FE.DynamicPartitionScheduler.before.commitCloudPartition");
794790
}
795-
for (int i = 0; i < partsInfo.size(); i++) {
796-
Env.getCurrentEnv().getEditLog().logAddPartition(partsInfo.get(i));
797-
if (DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.in.logEditPartitions")) {
798-
if (i == partsInfo.size() / 2) {
799-
LOG.info("debug point FE.DynamicPartitionScheduler.in.logEditPartitions, throw e");
800-
// committed, but log some edit, others failed
801-
throw new Exception("debug point FE.DynamicPartitionScheduler"
802-
+ ".in.commitCloudPartition");
791+
792+
for (int i = 0; i < batchPartsInfo.size(); i++) {
793+
// get table write lock to add partition, edit log and modify table state must be atomic
794+
olapTable.writeLockOrDdlException();
795+
try {
796+
boolean isTempPartition = addPartitionClauses.get(i).isTempPartition();
797+
Partition toAddPartition = batchPartsInfo.get(i).second;
798+
String partitionName = toAddPartition.getName();
799+
// ATTN: Check here to see if the newly created dynamic
800+
// partition has already been added by another process.
801+
// If it has, do not add this dynamic partition again,
802+
// and call `onErasePartition` to clean up any remaining information.
803+
Partition checkIsAdded = olapTable.getPartition(partitionName, isTempPartition);
804+
if (checkIsAdded != null) {
805+
LOG.warn("dynamic partition has been added, skip it. "
806+
+ "db: {}, table: {}, partition: {}, tableId: {}",
807+
db.getFullName(), tableName, partitionName, olapTable.getId());
808+
Env.getCurrentEnv().onErasePartition(toAddPartition);
809+
continue;
810+
}
811+
if (isTempPartition) {
812+
olapTable.addTempPartition(toAddPartition);
813+
} else {
814+
olapTable.addPartition(toAddPartition);
803815
}
816+
817+
Env.getCurrentEnv().getEditLog().logAddPartition(batchPartsInfo.get(i).first);
818+
if (DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.in.logEditPartitions")) {
819+
if (i == batchPartsInfo.size() / 2) {
820+
LOG.info("debug point FE.DynamicPartitionScheduler.in.logEditPartitions, throw e");
821+
// committed, but log some edit, others failed
822+
throw new Exception("debug point FE.DynamicPartitionScheduler"
823+
+ ".in.commitCloudPartition");
824+
}
825+
}
826+
} finally {
827+
olapTable.writeUnlock();
804828
}
805829
}
806830
LOG.info("finish write edit log to add partitions in batch, "
807831
+ "numPartitions: {}, db: {}, table: {}, tableId: {}",
808-
partsInfo.size(), db.getFullName(), tableName, olapTable.getId());
832+
batchPartsInfo.size(), db.getFullName(), tableName, olapTable.getId());
809833
} catch (Exception e) {
810834
LOG.warn("cloud in commit step, dbName {}, tableName {}, tableId {} exception {}",
811835
db.getFullName(), tableName, olapTable.getId(), e.getMessage());

fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1582,7 +1582,7 @@ public void addPartitionLike(Database db, String tableName, AddPartitionLikeClau
15821582
} finally {
15831583
table.readUnlock();
15841584
}
1585-
addPartition(db, tableName, clause, false, 0, true);
1585+
addPartition(db, tableName, clause, false, 0, true, null);
15861586

15871587
} catch (UserException e) {
15881588
throw new DdlException("Failed to ADD PARTITION " + addPartitionLikeClause.getPartitionName()
@@ -1602,7 +1602,9 @@ public static long checkAndGetBufferSize(long indexNum, long bucketNum,
16021602

16031603
public PartitionPersistInfo addPartition(Database db, String tableName, AddPartitionClause addPartitionClause,
16041604
boolean isCreateTable, long generatedPartitionId,
1605-
boolean writeEditLog) throws DdlException {
1605+
boolean writeEditLog,
1606+
List<Pair<PartitionPersistInfo, Partition>> batchPartitions)
1607+
throws DdlException {
16061608
// in cloud mode, isCreateTable == true, create dynamic partition use, so partitionId must have been generated.
16071609
// isCreateTable == false, other case, partitionId generate in below, must be set 0
16081610
if (!FeConstants.runningUnitTest && Config.isCloudMode()
@@ -1631,7 +1633,7 @@ public PartitionPersistInfo addPartition(Database db, String tableName, AddParti
16311633
if (singlePartitionDesc.isSetIfNotExists()) {
16321634
LOG.info("table[{}] add partition[{}] which already exists", olapTable.getName(), partitionName);
16331635
if (!DebugPointUtil.isEnable("InternalCatalog.addPartition.noCheckExists")) {
1634-
return null;
1636+
return;
16351637
}
16361638
} else {
16371639
ErrorReport.reportDdlException(ErrorCode.ERR_SAME_NAME_PARTITION, partitionName);
@@ -1838,7 +1840,7 @@ public PartitionPersistInfo addPartition(Database db, String tableName, AddParti
18381840
LOG.info("table[{}] add partition[{}] which already exists", olapTable.getName(), partitionName);
18391841
if (singlePartitionDesc.isSetIfNotExists()) {
18401842
failedCleanCallback.run();
1841-
return null;
1843+
return;
18421844
} else {
18431845
ErrorReport.reportDdlException(ErrorCode.ERR_SAME_NAME_PARTITION, partitionName);
18441846
}
@@ -1896,12 +1898,6 @@ public PartitionPersistInfo addPartition(Database db, String tableName, AddParti
18961898
// update partition info
18971899
partitionInfo.handleNewSinglePartitionDesc(singlePartitionDesc, partitionId, isTempPartition);
18981900

1899-
if (isTempPartition) {
1900-
olapTable.addTempPartition(partition);
1901-
} else {
1902-
olapTable.addPartition(partition);
1903-
}
1904-
19051901
// log
19061902
PartitionPersistInfo info = null;
19071903
if (partitionInfo.getType() == PartitionType.RANGE) {
@@ -1926,11 +1922,16 @@ public PartitionPersistInfo addPartition(Database db, String tableName, AddParti
19261922
}
19271923
if (writeEditLog) {
19281924
Env.getCurrentEnv().getEditLog().logAddPartition(info);
1925+
if (isTempPartition) {
1926+
olapTable.addTempPartition(partition);
1927+
} else {
1928+
olapTable.addPartition(partition);
1929+
}
19291930
LOG.info("succeed in creating partition[{}], temp: {}", partitionId, isTempPartition);
19301931
} else {
1932+
batchPartitions.add(Pair.of(info, partition));
19311933
LOG.info("postpone creating partition[{}], temp: {}", partitionId, isTempPartition);
19321934
}
1933-
return info;
19341935
} finally {
19351936
olapTable.writeUnlock();
19361937
}
@@ -1954,7 +1955,7 @@ public void addMultiPartitions(Database db, String tableName, AlterMultiPartitio
19541955
for (SinglePartitionDesc singlePartitionDesc : singlePartitionDescs) {
19551956
AddPartitionClause addPartitionClause = new AddPartitionClause(singlePartitionDesc, null,
19561957
multiPartitionClause.getProperties(), false);
1957-
addPartition(db, tableName, addPartitionClause, false, 0, true);
1958+
addPartition(db, tableName, addPartitionClause, false, 0, true, null);
19581959
}
19591960
}
19601961

fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ public static void addPartition(MTMV mtmv, PartitionKeyDesc oldPartitionKeyDesc)
401401
AddPartitionClause addPartitionClause = new AddPartitionClause(singlePartitionDesc,
402402
mtmv.getDefaultDistributionInfo().toDistributionDesc(), partitionProperties, false);
403403
Env.getCurrentEnv().addPartition((Database) mtmv.getDatabase(), mtmv.getName(), addPartitionClause,
404-
false, 0, true);
404+
false, 0, true, null);
405405
}
406406

407407
/**

fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1668,14 +1668,29 @@ public void logRefreshExternalTableSchema(RefreshExternalTableInfo info) {
16681668
}
16691669

16701670
public long logAddPartition(PartitionPersistInfo info) {
1671+
if (DebugPointUtil.isEnable("FE.logAddPartition.slow")) {
1672+
DebugPointUtil.DebugPoint debugPoint = DebugPointUtil.getDebugPoint("FE.logAddPartition.slow");
1673+
String pName = debugPoint.param("pName", "");
1674+
if (info.getPartition().getName().equals(pName)) {
1675+
int sleepMs = debugPoint.param("sleep", 1000);
1676+
LOG.info("logAddPartition debug point hit, pName {}, sleep {} s", pName, sleepMs);
1677+
try {
1678+
Thread.sleep(sleepMs);
1679+
} catch (InterruptedException e) {
1680+
LOG.warn("sleep interrupted", e);
1681+
}
1682+
}
1683+
}
16711684
long logId = logEdit(OperationType.OP_ADD_PARTITION, info);
1685+
LOG.info("log add partition, logId:{}, info: {}", logId, info.toJson());
16721686
AddPartitionRecord record = new AddPartitionRecord(logId, info);
16731687
Env.getCurrentEnv().getBinlogManager().addAddPartitionRecord(record);
16741688
return logId;
16751689
}
16761690

16771691
public long logDropPartition(DropPartitionInfo info) {
16781692
long logId = logEdit(OperationType.OP_DROP_PARTITION, info);
1693+
LOG.info("log drop partition, logId:{}, info: {}", logId, info.toJson());
16791694
Env.getCurrentEnv().getBinlogManager().addDropPartitionRecord(info, logId);
16801695
return logId;
16811696
}
@@ -1686,6 +1701,7 @@ public void logErasePartition(long partitionId) {
16861701

16871702
public void logRecoverPartition(RecoverInfo info) {
16881703
long logId = logEdit(OperationType.OP_RECOVER_PARTITION, info);
1704+
LOG.info("log recover partition, logId:{}, info: {}", logId, info.toJson());
16891705
Env.getCurrentEnv().getBinlogManager().addRecoverTableRecord(info, logId);
16901706
}
16911707

@@ -1704,6 +1720,7 @@ public void logBatchModifyPartition(BatchModifyPartitionsInfo info) {
17041720

17051721
public void logDropTable(DropInfo info) {
17061722
long logId = logEdit(OperationType.OP_DROP_TABLE, info);
1723+
LOG.info("log drop table, logId : {}, infos: {}", logId, info);
17071724
if (Strings.isNullOrEmpty(info.getCtl()) || info.getCtl().equals(InternalCatalog.INTERNAL_CATALOG_NAME)) {
17081725
DropTableRecord record = new DropTableRecord(logId, info);
17091726
Env.getCurrentEnv().getBinlogManager().addDropTableRecord(record);
@@ -1716,11 +1733,13 @@ public void logEraseTable(long tableId) {
17161733

17171734
public void logRecoverTable(RecoverInfo info) {
17181735
long logId = logEdit(OperationType.OP_RECOVER_TABLE, info);
1736+
LOG.info("log recover table, logId : {}, infos: {}", logId, info);
17191737
Env.getCurrentEnv().getBinlogManager().addRecoverTableRecord(info, logId);
17201738
}
17211739

17221740
public void logDropRollup(DropInfo info) {
17231741
long logId = logEdit(OperationType.OP_DROP_ROLLUP, info);
1742+
LOG.info("log drop rollup, logId : {}, infos: {}", logId, info);
17241743
Env.getCurrentEnv().getBinlogManager().addDropRollup(info, logId);
17251744
}
17261745

@@ -1837,7 +1856,8 @@ public void logDropRole(PrivInfo info) {
18371856
}
18381857

18391858
public void logDatabaseRename(DatabaseInfo databaseInfo) {
1840-
logEdit(OperationType.OP_RENAME_DB, databaseInfo);
1859+
long logId = logEdit(OperationType.OP_RENAME_DB, databaseInfo);
1860+
LOG.info("log database rename, logId : {}, infos: {}", logId, databaseInfo);
18411861
}
18421862

18431863
public void logTableRename(TableInfo tableInfo) {

fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3762,7 +3762,7 @@ public TCreatePartitionResult createPartition(TCreatePartitionRequest request) t
37623762
for (AddPartitionClause addPartitionClause : addPartitionClauseMap.values()) {
37633763
try {
37643764
// here maybe check and limit created partitions num
3765-
Env.getCurrentEnv().addPartition(db, olapTable.getName(), addPartitionClause, false, 0, true);
3765+
Env.getCurrentEnv().addPartition(db, olapTable.getName(), addPartitionClause, false, 0, true, null);
37663766
} catch (DdlException e) {
37673767
LOG.warn(e);
37683768
errorStatus.setErrorMsgs(

regression-test/suites/cloud_p0/diffrent_serialize/diffrent_serialize.groovy renamed to regression-test/suites/cloud_p0/different_serialize/different_serialize.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import org.codehaus.groovy.runtime.IOGroovyMethods
1919

20-
suite ("diffrent_serialize_cloud") {
20+
suite ("different_serialize_cloud") {
2121

2222
sql """ DROP TABLE IF EXISTS d_table; """
2323

0 commit comments

Comments
 (0)