Skip to content

Commit 121d7d6

Browse files
committed
add
Signed-off-by: gengjun-git <gengjun@starrocks.com>
1 parent e885f8d commit 121d7d6

File tree

6 files changed

+346
-41
lines changed

6 files changed

+346
-41
lines changed

fe/fe-core/src/main/java/com/starrocks/transaction/DatabaseTransactionMgr.java

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,13 @@ public VisibleStateWaiter commitPreparedTransaction(long transactionId) throws S
501501
Span unprotectedCommitSpan = TraceManager.startSpan("unprotectedCommitPreparedTransaction", txnSpan);
502502

503503
// transaction state transform
504-
boolean txnOperated = unprotectedCommitPreparedTransaction(copiedState, db);
504+
boolean txnOperated;
505+
writeLock();
506+
try {
507+
txnOperated = unprotectedCommitPreparedTransaction(copiedState, db);
508+
} finally {
509+
writeUnlock();
510+
}
505511
if (!txnOperated) {
506512
return null;
507513
}
@@ -1115,7 +1121,7 @@ public boolean canTxnFinished(TransactionState txn, Set<Long> errReplicas, Set<L
11151121
return true;
11161122
}
11171123

1118-
public void finishTransaction(long transactionId, Set<Long> errorReplicaIds, long lockTimeoutMs)
1124+
public TransactionState finishTransaction(long transactionId, Set<Long> errorReplicaIds, long lockTimeoutMs)
11191125
throws StarRocksException {
11201126
TransactionState transactionState = getTransactionState(transactionId);
11211127
// add all commit errors and publish errors to a single set
@@ -1145,7 +1151,7 @@ public void finishTransaction(long transactionId, Set<Long> errorReplicaIds, lon
11451151
}
11461152
});
11471153

1148-
return;
1154+
return copiedState;
11491155
} finally {
11501156
transactionState.writeUnlock();
11511157
}
@@ -1224,8 +1230,10 @@ public void finishTransaction(long transactionId, Set<Long> errorReplicaIds, lon
12241230
"wait for publishing partition %d version %d. self version: %d. table %d",
12251231
physicalPartitionId, physicalPartition.getVisibleVersion() + 1,
12261232
partitionCommitInfo.getVersion(), tableId);
1227-
copiedState.setErrorMsg(errMsg);
1228-
return;
1233+
// set errMsg to transactionState instead of copiedState,
1234+
// because copiedState will not be upserted in this case.
1235+
transactionState.setErrorMsg(errMsg);
1236+
return transactionState;
12291237
}
12301238

12311239
if (table.isCloudNativeTableOrMaterializedView()) {
@@ -1309,7 +1317,9 @@ public void finishTransaction(long transactionId, Set<Long> errorReplicaIds, lon
13091317
tablet.getId(), healthReplicaNum, quorumReplicaNum, tableId,
13101318
physicalPartitionId,
13111319
physicalPartition.getVisibleVersion() + 1);
1312-
copiedState.setErrorMsg(errMsg);
1320+
// set errMsg to transactionState instead of copiedState,
1321+
// because copiedState will not be upserted in this case.
1322+
transactionState.setErrorMsg(errMsg);
13131323
hasError = true;
13141324
}
13151325

@@ -1344,7 +1354,7 @@ public void finishTransaction(long transactionId, Set<Long> errorReplicaIds, lon
13441354
"version not equal to partition commit version or commit version - 1 if it's not a " +
13451355
"upgrade stage, its a fatal error. ",
13461356
copiedState);
1347-
return;
1357+
return transactionState;
13481358
}
13491359
copiedState.setErrorReplicas(errorReplicaIds);
13501360
copiedState.setFinishTime(System.currentTimeMillis());
@@ -1397,6 +1407,7 @@ public void finishTransaction(long transactionId, Set<Long> errorReplicaIds, lon
13971407
GlobalStateMgr.getCurrentState().getLocalMetastore().handleMVRepair(copiedState);
13981408
LOG.info("finish transaction {} successfully", copiedState);
13991409
updateTransactionMetrics(copiedState);
1410+
return copiedState;
14001411
}
14011412

14021413
protected boolean unprotectedCommitPreparedTransaction(TransactionState transactionState, Database db) {
@@ -1405,7 +1416,7 @@ protected boolean unprotectedCommitPreparedTransaction(TransactionState transact
14051416
return false;
14061417
}
14071418
// commit timestamps needs to be strictly monotonically increasing
1408-
long commitTs = Math.max(System.currentTimeMillis(), maxCommitTs + 1);
1419+
long commitTs = reserveCommitTs();
14091420
transactionState.setCommitTime(commitTs);
14101421
// update transaction state version
14111422
transactionState.setTransactionStatus(TransactionStatus.COMMITTED);
@@ -1518,6 +1529,13 @@ protected boolean unprotectedCommitPreparedTransaction(TransactionState transact
15181529
return true;
15191530
}
15201531

1532+
private long reserveCommitTs() {
1533+
Preconditions.checkState(transactionLock.isWriteLockedByCurrentThread());
1534+
long commitTs = Math.max(System.currentTimeMillis(), maxCommitTs + 1);
1535+
maxCommitTs = commitTs;
1536+
return commitTs;
1537+
}
1538+
15211539
// for add/update/delete TransactionState
15221540
protected void unprotectUpsertTransactionState(TransactionState transactionState) {
15231541
// it's OK if getCommitTime() returns -1
@@ -2052,7 +2070,7 @@ GlobalStateMgr getGlobalStateMgr() {
20522070
return globalStateMgr;
20532071
}
20542072

2055-
public void finishTransactionNew(TransactionState transactionState, Set<Long> publishErrorReplicas)
2073+
public TransactionState finishTransactionNew(TransactionState transactionState, Set<Long> publishErrorReplicas)
20562074
throws StarRocksException {
20572075
Database db = globalStateMgr.getLocalMetastore().getDb(transactionState.getDbId());
20582076
if (db == null) {
@@ -2072,7 +2090,7 @@ public void finishTransactionNew(TransactionState transactionState, Set<Long> pu
20722090
}
20732091
});
20742092

2075-
return;
2093+
return copiedState;
20762094
} finally {
20772095
transactionState.writeUnlock();
20782096
}
@@ -2133,6 +2151,7 @@ public void finishTransactionNew(TransactionState transactionState, Set<Long> pu
21332151
GlobalStateMgr.getCurrentState().getLocalMetastore().handleMVRepair(copiedState);
21342152
LOG.info("finish transaction {} successfully", copiedState);
21352153
updateTransactionMetrics(copiedState);
2154+
return copiedState;
21362155
}
21372156

21382157
// only for test
@@ -2194,7 +2213,7 @@ private boolean isTxnStateBatchConsistent(Database db, TransactionStateBatch sta
21942213
}
21952214

21962215

2197-
public void finishTransactionBatch(TransactionStateBatch stateBatch, Set<Long> errorReplicaIds) {
2216+
public TransactionStateBatch finishTransactionBatch(TransactionStateBatch stateBatch, Set<Long> errorReplicaIds) {
21982217
Database db = globalStateMgr.getLocalMetastore().getDb(stateBatch.getDbId());
21992218
if (db == null) {
22002219
stateBatch.writeLock();
@@ -2211,7 +2230,7 @@ public void finishTransactionBatch(TransactionStateBatch stateBatch, Set<Long> e
22112230
writeUnlock();
22122231
}
22132232
});
2214-
return;
2233+
return copiedStateBatch;
22152234
} finally {
22162235
stateBatch.writeUnlock();
22172236
}
@@ -2231,7 +2250,7 @@ public void finishTransactionBatch(TransactionStateBatch stateBatch, Set<Long> e
22312250
copiedStateBatch = new TransactionStateBatch(stateBatch);
22322251
// check whether version is consistent
22332252
if (!isTxnStateBatchConsistent(db, copiedStateBatch)) {
2234-
return;
2253+
return stateBatch;
22352254
}
22362255

22372256
copiedStateBatch.setTransactionVisibleInfo();
@@ -2264,6 +2283,7 @@ public void finishTransactionBatch(TransactionStateBatch stateBatch, Set<Long> e
22642283
for (TransactionState transactionState : copiedStateBatch.getTransactionStates()) {
22652284
updateTransactionMetrics(transactionState);
22662285
}
2286+
return copiedStateBatch;
22672287
}
22682288

22692289
private void updateTransactionMetrics(TransactionState txnState) {

fe/fe-core/src/main/java/com/starrocks/transaction/GlobalTransactionMgr.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -620,27 +620,30 @@ public boolean existCommittedTxns(Long dbId, Long tableId, Long partitionId) {
620620
* @param errorReplicaIds
621621
* @return
622622
*/
623-
public void finishTransaction(long dbId, long transactionId, Set<Long> errorReplicaIds) throws StarRocksException {
623+
public TransactionState finishTransaction(long dbId, long transactionId, Set<Long> errorReplicaIds)
624+
throws StarRocksException {
624625
DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
625-
dbTransactionMgr.finishTransaction(transactionId, errorReplicaIds, 0L);
626+
return dbTransactionMgr.finishTransaction(transactionId, errorReplicaIds, 0L);
626627
}
627628

628-
public void finishTransaction(long dbId, long transactionId, Set<Long> errorReplicaIds, long lockTimeoutMs)
629+
public TransactionState finishTransaction(long dbId, long transactionId, Set<Long> errorReplicaIds,
630+
long lockTimeoutMs)
629631
throws StarRocksException {
630632
DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
631-
dbTransactionMgr.finishTransaction(transactionId, errorReplicaIds, lockTimeoutMs);
633+
return dbTransactionMgr.finishTransaction(transactionId, errorReplicaIds, lockTimeoutMs);
632634
}
633635

634-
public void finishTransactionBatch(long dbId, TransactionStateBatch stateBatch, Set<Long> errorReplicaIds)
636+
public TransactionStateBatch finishTransactionBatch(long dbId, TransactionStateBatch stateBatch,
637+
Set<Long> errorReplicaIds)
635638
throws StarRocksException {
636639
DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
637-
dbTransactionMgr.finishTransactionBatch(stateBatch, errorReplicaIds);
640+
return dbTransactionMgr.finishTransactionBatch(stateBatch, errorReplicaIds);
638641
}
639642

640-
public void finishTransactionNew(TransactionState txnState, Set<Long> publishErrorReplicas) throws
643+
public TransactionState finishTransactionNew(TransactionState txnState, Set<Long> publishErrorReplicas) throws
641644
StarRocksException {
642645
DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(txnState.getDbId());
643-
dbTransactionMgr.finishTransactionNew(txnState, publishErrorReplicas);
646+
return dbTransactionMgr.finishTransactionNew(txnState, publishErrorReplicas);
644647
}
645648

646649
public boolean canTxnFinished(TransactionState txn, Set<Long> errReplicas,

fe/fe-core/src/main/java/com/starrocks/transaction/PublishVersionDaemon.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ private void tryFinishTransaction(TransactionState transactionState) throws Star
387387
try {
388388
// Attempt to finish the transaction with a lock timeout. If it fails, it will be retried in the next cycle.
389389
// This approach prevents blocking subsequent transactions due to the current one.
390-
globalTransactionMgr.finishTransaction(transactionState.getDbId(),
390+
transactionState = globalTransactionMgr.finishTransaction(transactionState.getDbId(),
391391
transactionState.getTransactionId(), publishErrorReplicaIds,
392392
Config.finish_transaction_default_lock_timeout_ms);
393393
} catch (StarRocksException exception) {
@@ -421,7 +421,7 @@ private void publishVersionNew(GlobalTransactionMgr globalTransactionMgr, List<T
421421
}
422422
try {
423423
if (transactionState.checkCanFinish()) {
424-
globalTransactionMgr.finishTransactionNew(transactionState, publishErrorReplicas);
424+
transactionState = globalTransactionMgr.finishTransactionNew(transactionState, publishErrorReplicas);
425425
}
426426
if (transactionState.getTransactionStatus() != TransactionStatus.VISIBLE) {
427427
transactionState.updateSendTaskTime();
@@ -847,9 +847,10 @@ private CompletableFuture<Void> publishLakeTransactionBatchAsync(TransactionStat
847847
// as soon as the publish task succeeds. Set readyToFinishTime here so that
848848
// publishCanFinishLatencyMs and publishAckLatencyMs are recorded correctly.
849849
states.forEach(TransactionState::setReadyToFinishTimeIfUnset);
850-
globalTransactionMgr.finishTransactionBatch(dbId, txnStateBatch, null);
850+
TransactionStateBatch latestStateBatch =
851+
globalTransactionMgr.finishTransactionBatch(dbId, txnStateBatch, null);
851852
// here create the job to drop txnLog, for the visibleVersion has been updated
852-
submitDeleteTxnLogJob(txnStateBatch);
853+
submitDeleteTxnLogJob(latestStateBatch);
853854
} catch (StarRocksException e) {
854855
throw new RuntimeException(e);
855856
}

0 commit comments

Comments
 (0)