Skip to content

Commit c3af819

Browse files
deardengyiguolei
authored andcommitted
[fix](cloud)Fix create dynamic table race with insert overwrite (apache#59489)
1. Thread 1 executed a DROP TABLE operation, followed by a CREATE TABLE operation, initializing a dynamic partition (Partition A) in memory. 2. Due to an optimization in the cloud, batch partition edit logs were attempted; however, the edit log for Partition A was not yet synchronized to the followers. 3. Thread 2 began an INSERT INTO ... OVERWRITE PARTITION (*) operation, identifying Partition A in memory and writing a replace partition edit log. 4. In follower, the replay thread attempted to replay the replace partition edit log but found a dependency on Partition A, which was absent from memory, resulting in an exception and subsequent termination. Fix ``` 2025-12-30 16:10:42,774 ERROR (replayer|123) [EditLog.loadJournal():1445] replay Operation Type 210, log id: 1910 java.lang.NullPointerException at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:906) at org.apache.doris.catalog.OlapTable.checkPartition(OlapTable.java:2835) at org.apache.doris.catalog.OlapTable.replaceTempPartitions(OlapTable.java:2799) at org.apache.doris.catalog.Env.replayReplaceTempPartition(Env.java:6750) at org.apache.doris.persist.EditLog.loadJournal(EditLog.java:986) at org.apache.doris.catalog.Env.replayJournal(Env.java:3100) at org.apache.doris.catalog.Env$4.runOneCycle(Env.java:2862) at org.apache.doris.common.util.Daemon.run(Daemon.java:119) 2025-12-30 16:10:42,775 INFO (Thread-0|32) [DorisFE.lambda$start$0():159] Received shutdown signal, starting graceful shutdown... 2025-12-30 16:10:42,776 INFO (Thread-0|32) [DorisFE.gracefulShutdown():639] graceful shutdown finished ``` Issue Number: close #xxx Related PR: #xxx Problem Summary: None - Test <!-- At least one of them must be included. --> - [x] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [x] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [x] No. - [ ] Yes. <!-- Add document PR link here. eg: apache/doris-website#1214 --> - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into -->
1 parent d704481 commit c3af819

File tree

8 files changed

+188
-48
lines changed

8 files changed

+188
-48
lines changed

fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3489,14 +3489,17 @@ public boolean createTable(CreateTableInfo createTableInfo) throws UserException
34893489
* @param isCreateTable this call is for creating table
34903490
* @param generatedPartitionId the preset partition id for the partition to add
34913491
* @param writeEditLog whether to write an edit log for this addition
3492-
* @return PartitionPersistInfo to be written to editlog. It may be null if no partitions added.
3492+
* @batchPartitions output parameter, used to batch write edit log outside this function, can be null.
3493+
* first is editlog PartitionPersistInfo, second is the added Partition
34933494
* @throws DdlException
34943495
*/
3495-
public PartitionPersistInfo addPartition(Database db, String tableName, AddPartitionClause addPartitionClause,
3496+
public void addPartition(Database db, String tableName, AddPartitionClause addPartitionClause,
34963497
boolean isCreateTable, long generatedPartitionId,
3497-
boolean writeEditLog) throws DdlException {
3498-
return getInternalCatalog().addPartition(db, tableName, addPartitionClause,
3499-
isCreateTable, generatedPartitionId, writeEditLog);
3498+
boolean writeEditLog,
3499+
List<Pair<PartitionPersistInfo, Partition>> batchPartitions)
3500+
throws DdlException {
3501+
getInternalCatalog().addPartition(db, tableName, addPartitionClause,
3502+
isCreateTable, generatedPartitionId, writeEditLog, batchPartitions);
35003503
}
35013504

35023505
public void addMultiPartitions(Database db, String tableName, AlterMultiPartitionClause multiPartitionClause)

fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java

Lines changed: 49 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -776,7 +776,7 @@ public void executeDynamicPartition(Collection<Pair<Long, Long>> dynamicPartitio
776776
cloudBatchBeforeCreatePartitions(executeFirstTime, addPartitionClauses, olapTable, indexIds,
777777
db, tableName, generatedPartitionIds);
778778

779-
List<PartitionPersistInfo> partsInfo = new ArrayList<>();
779+
List<Pair<PartitionPersistInfo, Partition>> batchPartsInfo = new ArrayList<>();
780780
for (int i = 0; i < addPartitionClauses.size(); i++) {
781781
try {
782782
boolean needWriteEditLog = true;
@@ -785,15 +785,10 @@ public void executeDynamicPartition(Collection<Pair<Long, Long>> dynamicPartitio
785785
if (Config.isCloudMode()) {
786786
needWriteEditLog = !executeFirstTime;
787787
}
788-
PartitionPersistInfo info =
789-
Env.getCurrentEnv().addPartition(db, tableName, addPartitionClauses.get(i),
788+
Env.getCurrentEnv().addPartition(db, tableName, addPartitionClauses.get(i),
790789
executeFirstTime,
791790
executeFirstTime && Config.isCloudMode() ? generatedPartitionIds.get(i) : 0,
792-
needWriteEditLog);
793-
if (info == null) {
794-
throw new Exception("null persisted partition returned");
795-
}
796-
partsInfo.add(info);
791+
needWriteEditLog, batchPartsInfo);
797792
clearCreatePartitionFailedMsg(olapTable.getId());
798793
} catch (Exception e) {
799794
recordCreatePartitionFailedMsg(db.getFullName(), tableName, e.getMessage(), olapTable.getId());
@@ -804,7 +799,7 @@ public void executeDynamicPartition(Collection<Pair<Long, Long>> dynamicPartitio
804799
}
805800
}
806801
}
807-
cloudBatchAfterCreatePartitions(executeFirstTime, partsInfo,
802+
cloudBatchAfterCreatePartitions(executeFirstTime, batchPartsInfo,
808803
addPartitionClauses, db, olapTable, indexIds, tableName);
809804

810805
// ATTN: Breaking up dynamic partition table scheduling, consuming peak CPU consumption
@@ -824,15 +819,16 @@ public void executeDynamicPartition(Collection<Pair<Long, Long>> dynamicPartitio
824819
}
825820
}
826821

827-
private void cloudBatchAfterCreatePartitions(boolean executeFirstTime, List<PartitionPersistInfo> partsInfo,
828-
ArrayList<AddPartitionClause> addPartitionClauses, Database db,
829-
OlapTable olapTable, List<Long> indexIds,
830-
String tableName) throws DdlException {
822+
private void cloudBatchAfterCreatePartitions(boolean executeFirstTime,
823+
List<Pair<PartitionPersistInfo, Partition>> batchPartsInfo,
824+
ArrayList<AddPartitionClause> addPartitionClauses, Database db,
825+
OlapTable olapTable, List<Long> indexIds,
826+
String tableName) throws DdlException {
831827
if (Config.isNotCloudMode()) {
832828
return;
833829
}
834-
List<Long> succeedPartitionIds = partsInfo.stream().map(partitionPersistInfo
835-
-> partitionPersistInfo.getPartition().getId()).collect(Collectors.toList());
830+
List<Long> succeedPartitionIds = batchPartsInfo.stream().map(partitionInfo
831+
-> partitionInfo.first.getPartition().getId()).collect(Collectors.toList());
836832
if (!executeFirstTime || addPartitionClauses.isEmpty()) {
837833
LOG.info("cloud commit rpc in batch, {}-{}", !executeFirstTime, addPartitionClauses.size());
838834
return;
@@ -849,28 +845,56 @@ private void cloudBatchAfterCreatePartitions(boolean executeFirstTime, List<Part
849845
succeedPartitionIds, indexIds, true /* isCreateTable */, false /* isBatchCommit */);
850846
LOG.info("begin write edit log to add partitions in batch, "
851847
+ "numPartitions: {}, db: {}, table: {}, tableId: {}",
852-
partsInfo.size(), db.getFullName(), tableName, olapTable.getId());
848+
batchPartsInfo.size(), db.getFullName(), tableName, olapTable.getId());
853849
// ATTN: here, edit log must after commit cloud partition,
854850
// prevent commit RPC failure from causing data loss
855851
if (DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.before.logEditPartitions")) {
856852
LOG.info("debug point FE.DynamicPartitionScheduler.before.logEditPartitions, throw e");
857853
// committed, but not log edit
858854
throw new Exception("debug point FE.DynamicPartitionScheduler.before.commitCloudPartition");
859855
}
860-
for (int i = 0; i < partsInfo.size(); i++) {
861-
Env.getCurrentEnv().getEditLog().logAddPartition(partsInfo.get(i));
862-
if (DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.in.logEditPartitions")) {
863-
if (i == partsInfo.size() / 2) {
864-
LOG.info("debug point FE.DynamicPartitionScheduler.in.logEditPartitions, throw e");
865-
// committed, but log some edit, others failed
866-
throw new Exception("debug point FE.DynamicPartitionScheduler"
867-
+ ".in.commitCloudPartition");
856+
857+
for (int i = 0; i < batchPartsInfo.size(); i++) {
858+
// get table write lock to add partition, edit log and modify table state must be atomic
859+
olapTable.writeLockOrDdlException();
860+
try {
861+
boolean isTempPartition = addPartitionOps.get(i).isTempPartition();
862+
Partition toAddPartition = batchPartsInfo.get(i).second;
863+
String partitionName = toAddPartition.getName();
864+
// ATTN: Check here to see if the newly created dynamic
865+
// partition has already been added by another process.
866+
// If it has, do not add this dynamic partition again,
867+
// and call `onErasePartition` to clean up any remaining information.
868+
Partition checkIsAdded = olapTable.getPartition(partitionName, isTempPartition);
869+
if (checkIsAdded != null) {
870+
LOG.warn("dynamic partition has been added, skip it. "
871+
+ "db: {}, table: {}, partition: {}, tableId: {}",
872+
db.getFullName(), tableName, partitionName, olapTable.getId());
873+
Env.getCurrentEnv().onErasePartition(toAddPartition);
874+
continue;
875+
}
876+
if (isTempPartition) {
877+
olapTable.addTempPartition(toAddPartition);
878+
} else {
879+
olapTable.addPartition(toAddPartition);
868880
}
881+
882+
Env.getCurrentEnv().getEditLog().logAddPartition(batchPartsInfo.get(i).first);
883+
if (DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.in.logEditPartitions")) {
884+
if (i == batchPartsInfo.size() / 2) {
885+
LOG.info("debug point FE.DynamicPartitionScheduler.in.logEditPartitions, throw e");
886+
// committed, but log some edit, others failed
887+
throw new Exception("debug point FE.DynamicPartitionScheduler"
888+
+ ".in.commitCloudPartition");
889+
}
890+
}
891+
} finally {
892+
olapTable.writeUnlock();
869893
}
870894
}
871895
LOG.info("finish write edit log to add partitions in batch, "
872896
+ "numPartitions: {}, db: {}, table: {}, tableId: {}",
873-
partsInfo.size(), db.getFullName(), tableName, olapTable.getId());
897+
batchPartsInfo.size(), db.getFullName(), tableName, olapTable.getId());
874898
} catch (Exception e) {
875899
LOG.warn("cloud in commit step, dbName {}, tableName {}, tableId {} exception {}",
876900
db.getFullName(), tableName, olapTable.getId(), e.getMessage());

fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1358,7 +1358,7 @@ public void addPartitionLike(Database db, String tableName, AddPartitionLikeClau
13581358
} finally {
13591359
table.readUnlock();
13601360
}
1361-
addPartition(db, tableName, clause, false, 0, true);
1361+
addPartition(db, tableName, clause, false, 0, true, null);
13621362

13631363
} catch (UserException e) {
13641364
throw new DdlException("Failed to ADD PARTITION " + addPartitionLikeClause.getPartitionName()
@@ -1447,9 +1447,11 @@ private ResultOr<CompletableFuture<Void>, DdlException> getCurrentPartitionFutur
14471447
}
14481448
}
14491449

1450-
public PartitionPersistInfo addPartition(Database db, String tableName, AddPartitionClause addPartitionClause,
1450+
public void addPartition(Database db, String tableName, AddPartitionClause addPartitionClause,
14511451
boolean isCreateTable, long generatedPartitionId,
1452-
boolean writeEditLog) throws DdlException {
1452+
boolean writeEditLog,
1453+
List<Pair<PartitionPersistInfo, Partition>> batchPartitions)
1454+
throws DdlException {
14531455
// in cloud mode, isCreateTable == true, create dynamic partition use, so partitionId must have been generated.
14541456
// isCreateTable == false, other case, partitionId generate in below, must be set 0
14551457
if (!FeConstants.runningUnitTest && Config.isCloudMode()
@@ -1478,7 +1480,7 @@ public PartitionPersistInfo addPartition(Database db, String tableName, AddParti
14781480
if (singlePartitionDesc.isSetIfNotExists()) {
14791481
LOG.info("table[{}] add partition[{}] which already exists", olapTable.getName(), partitionName);
14801482
if (!DebugPointUtil.isEnable("InternalCatalog.addPartition.noCheckExists")) {
1481-
return null;
1483+
return;
14821484
}
14831485
} else {
14841486
ErrorReport.reportDdlException(ErrorCode.ERR_SAME_NAME_PARTITION, partitionName);
@@ -1645,7 +1647,7 @@ public PartitionPersistInfo addPartition(Database db, String tableName, AddParti
16451647
db, tableName, olapTable, partitionName, singlePartitionDesc);
16461648
if (ownerFutureOr.isErr()) {
16471649
if (ownerFutureOr.unwrapErr() == null) {
1648-
return null;
1650+
return;
16491651
} else {
16501652
throw ownerFutureOr.unwrapErr();
16511653
}
@@ -1701,7 +1703,7 @@ public PartitionPersistInfo addPartition(Database db, String tableName, AddParti
17011703
LOG.info("table[{}] add partition[{}] which already exists", olapTable.getName(), partitionName);
17021704
if (singlePartitionDesc.isSetIfNotExists()) {
17031705
failedCleanCallback.run();
1704-
return null;
1706+
return;
17051707
} else {
17061708
ErrorReport.reportDdlException(ErrorCode.ERR_SAME_NAME_PARTITION, partitionName);
17071709
}
@@ -1759,12 +1761,6 @@ public PartitionPersistInfo addPartition(Database db, String tableName, AddParti
17591761
// update partition info
17601762
partitionInfo.handleNewSinglePartitionDesc(singlePartitionDesc, partitionId, isTempPartition);
17611763

1762-
if (isTempPartition) {
1763-
olapTable.addTempPartition(partition);
1764-
} else {
1765-
olapTable.addPartition(partition);
1766-
}
1767-
17681764
// log
17691765
PartitionPersistInfo info = null;
17701766
if (partitionInfo.getType() == PartitionType.RANGE) {
@@ -1790,11 +1786,16 @@ public PartitionPersistInfo addPartition(Database db, String tableName, AddParti
17901786
}
17911787
if (writeEditLog) {
17921788
Env.getCurrentEnv().getEditLog().logAddPartition(info);
1789+
if (isTempPartition) {
1790+
olapTable.addTempPartition(partition);
1791+
} else {
1792+
olapTable.addPartition(partition);
1793+
}
17931794
LOG.info("succeed in creating partition[{}], temp: {}", partitionId, isTempPartition);
17941795
} else {
1796+
batchPartitions.add(Pair.of(info, partition));
17951797
LOG.info("postpone creating partition[{}], temp: {}", partitionId, isTempPartition);
17961798
}
1797-
return info;
17981799
} finally {
17991800
olapTable.writeUnlock();
18001801
}
@@ -1840,7 +1841,7 @@ public void addMultiPartitions(Database db, String tableName, AlterMultiPartitio
18401841
for (SinglePartitionDesc singlePartitionDesc : singlePartitionDescs) {
18411842
AddPartitionClause addPartitionClause = new AddPartitionClause(singlePartitionDesc, null,
18421843
multiPartitionClause.getProperties(), false);
1843-
addPartition(db, tableName, addPartitionClause, false, 0, true);
1844+
addPartition(db, tableName, addPartitionClause, false, 0, true, null);
18441845
}
18451846
}
18461847

fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ public static void addPartition(MTMV mtmv, PartitionKeyDesc oldPartitionKeyDesc)
404404
AddPartitionClause addPartitionClause = new AddPartitionClause(singlePartitionDesc,
405405
mtmv.getDefaultDistributionInfo().toDistributionDesc(), partitionProperties, false);
406406
Env.getCurrentEnv().addPartition((Database) mtmv.getDatabase(), mtmv.getName(), addPartitionClause,
407-
false, 0, true);
407+
false, 0, true, null);
408408
}
409409

410410
/**

fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1679,14 +1679,29 @@ public void logRefreshExternalTableSchema(RefreshExternalTableInfo info) {
16791679
}
16801680

16811681
public long logAddPartition(PartitionPersistInfo info) {
1682+
if (DebugPointUtil.isEnable("FE.logAddPartition.slow")) {
1683+
DebugPointUtil.DebugPoint debugPoint = DebugPointUtil.getDebugPoint("FE.logAddPartition.slow");
1684+
String pName = debugPoint.param("pName", "");
1685+
if (info.getPartition().getName().equals(pName)) {
1686+
int sleepMs = debugPoint.param("sleep", 1000);
1687+
LOG.info("logAddPartition debug point hit, pName {}, sleep {} s", pName, sleepMs);
1688+
try {
1689+
Thread.sleep(sleepMs);
1690+
} catch (InterruptedException e) {
1691+
LOG.warn("sleep interrupted", e);
1692+
}
1693+
}
1694+
}
16821695
long logId = logEdit(OperationType.OP_ADD_PARTITION, info);
1696+
LOG.info("log add partition, logId:{}, info: {}", logId, info.toJson());
16831697
AddPartitionRecord record = new AddPartitionRecord(logId, info);
16841698
Env.getCurrentEnv().getBinlogManager().addAddPartitionRecord(record);
16851699
return logId;
16861700
}
16871701

16881702
public long logDropPartition(DropPartitionInfo info) {
16891703
long logId = logEdit(OperationType.OP_DROP_PARTITION, info);
1704+
LOG.info("log drop partition, logId:{}, info: {}", logId, info.toJson());
16901705
Env.getCurrentEnv().getBinlogManager().addDropPartitionRecord(info, logId);
16911706
return logId;
16921707
}
@@ -1697,6 +1712,7 @@ public void logErasePartition(long partitionId) {
16971712

16981713
public void logRecoverPartition(RecoverInfo info) {
16991714
long logId = logEdit(OperationType.OP_RECOVER_PARTITION, info);
1715+
LOG.info("log recover partition, logId:{}, info: {}", logId, info.toJson());
17001716
Env.getCurrentEnv().getBinlogManager().addRecoverTableRecord(info, logId);
17011717
}
17021718

@@ -1715,6 +1731,7 @@ public void logBatchModifyPartition(BatchModifyPartitionsInfo info) {
17151731

17161732
public void logDropTable(DropInfo info) {
17171733
long logId = logEdit(OperationType.OP_DROP_TABLE, info);
1734+
LOG.info("log drop table, logId : {}, infos: {}", logId, info);
17181735
if (Strings.isNullOrEmpty(info.getCtl()) || info.getCtl().equals(InternalCatalog.INTERNAL_CATALOG_NAME)) {
17191736
DropTableRecord record = new DropTableRecord(logId, info);
17201737
Env.getCurrentEnv().getBinlogManager().addDropTableRecord(record);
@@ -1727,11 +1744,13 @@ public void logEraseTable(long tableId) {
17271744

17281745
public void logRecoverTable(RecoverInfo info) {
17291746
long logId = logEdit(OperationType.OP_RECOVER_TABLE, info);
1747+
LOG.info("log recover table, logId : {}, infos: {}", logId, info);
17301748
Env.getCurrentEnv().getBinlogManager().addRecoverTableRecord(info, logId);
17311749
}
17321750

17331751
public void logDropRollup(DropInfo info) {
17341752
long logId = logEdit(OperationType.OP_DROP_ROLLUP, info);
1753+
LOG.info("log drop rollup, logId : {}, infos: {}", logId, info);
17351754
Env.getCurrentEnv().getBinlogManager().addDropRollup(info, logId);
17361755
}
17371756

@@ -1848,7 +1867,8 @@ public void logDropRole(PrivInfo info) {
18481867
}
18491868

18501869
public void logDatabaseRename(DatabaseInfo databaseInfo) {
1851-
logEdit(OperationType.OP_RENAME_DB, databaseInfo);
1870+
long logId = logEdit(OperationType.OP_RENAME_DB, databaseInfo);
1871+
LOG.info("log database rename, logId : {}, infos: {}", logId, databaseInfo);
18521872
}
18531873

18541874
public void logTableRename(TableInfo tableInfo) {

fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3785,7 +3785,7 @@ public TCreatePartitionResult createPartition(TCreatePartitionRequest request) t
37853785
for (AddPartitionClause addPartitionClause : addPartitionClauseMap.values()) {
37863786
try {
37873787
// here maybe check and limit created partitions num
3788-
Env.getCurrentEnv().addPartition(db, olapTable.getName(), addPartitionClause, false, 0, true);
3788+
Env.getCurrentEnv().addPartition(db, olapTable.getName(), addPartitionClause, false, 0, true, null);
37893789
} catch (DdlException e) {
37903790
LOG.warn(e);
37913791
errorStatus.setErrorMsgs(

regression-test/suites/cloud_p0/diffrent_serialize/diffrent_serialize.groovy renamed to regression-test/suites/cloud_p0/different_serialize/different_serialize.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import org.codehaus.groovy.runtime.IOGroovyMethods
1919

20-
suite ("diffrent_serialize_cloud") {
20+
suite ("different_serialize_cloud") {
2121

2222
sql """ DROP TABLE IF EXISTS d_table; """
2323

0 commit comments

Comments
 (0)