Skip to content

Commit b2f72ed

Browse files
committed
add
Signed-off-by: gengjun-git <gengjun@starrocks.com>
1 parent 0a8df22 commit b2f72ed

20 files changed

+224
-96
lines changed

fe/fe-core/src/main/java/com/starrocks/load/loadv2/IVMInsertLoadTxnCallback.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti
9494
}
9595

9696
@Override
97-
public void afterCommitted(TransactionState txnState, boolean txnOperated) throws StarRocksException {
97+
public void afterCommitted(TransactionState txnState) throws StarRocksException {
9898
if (CollectionUtils.sizeIsEmpty(this.baseTableInfoTvrDeltaMap)) {
9999
LOG.info("Materialized view {} has no base table info tvr version range to update, skip", mv.getName());
100100
return;

fe/fe-core/src/main/java/com/starrocks/load/loadv2/InsertLoadJob.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -270,13 +270,10 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti
270270
}
271271

272272
@Override
273-
public void afterCommitted(TransactionState txnState, boolean txnOperated) throws StarRocksException {
274-
if (!txnOperated) {
275-
return;
276-
}
273+
public void afterCommitted(TransactionState txnState) throws StarRocksException {
277274
loadCommittedTimestamp = System.currentTimeMillis();
278275
if (txnCallback != null) {
279-
txnCallback.afterCommitted(txnState, txnOperated);
276+
txnCallback.afterCommitted(txnState);
280277
}
281278
}
282279

fe/fe-core/src/main/java/com/starrocks/load/loadv2/InsertLoadTxnCallback.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,5 @@ public interface InsertLoadTxnCallback {
3333
/**
3434
* After transaction is committed, do some follow-up work.
3535
*/
36-
void afterCommitted(TransactionState txnState, boolean txnOperated) throws StarRocksException;
36+
void afterCommitted(TransactionState txnState) throws StarRocksException;
3737
}

fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadJob.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1115,10 +1115,7 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti
11151115
}
11161116

11171117
@Override
1118-
public void afterCommitted(TransactionState txnState, boolean txnOperated) throws StarRocksException {
1119-
if (!txnOperated) {
1120-
return;
1121-
}
1118+
public void afterCommitted(TransactionState txnState) throws StarRocksException {
11221119
writeLock();
11231120
try {
11241121
unprotectUpdateLoadingStatus(txnState);

fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadJob.java

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,29 +1032,27 @@ private void executeBeforeCheck(TransactionState txnState, TransactionStatus tra
10321032
// paused job or renew task
10331033
// *** Please do not call after individually. It must be combined use with before ***
10341034
@Override
1035-
public void afterCommitted(TransactionState txnState, boolean txnOperated) throws StarRocksException {
1035+
public void afterCommitted(TransactionState txnState) throws StarRocksException {
10361036
long taskBeId = -1L;
10371037
try {
1038-
if (txnOperated) {
1039-
// find task in job
1040-
Optional<RoutineLoadTaskInfo> routineLoadTaskInfoOptional = routineLoadTaskInfoList.stream().filter(
1041-
entity -> entity.getTxnId() == txnState.getTransactionId()).findFirst();
1042-
if (routineLoadTaskInfoOptional.isPresent()) {
1043-
RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get();
1044-
taskBeId = routineLoadTaskInfo.getBeId();
1045-
executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.COMMITTED, null);
1046-
routineLoadTaskInfo.afterCommitted(txnState, txnOperated);
1047-
}
1048-
++committedTaskNum;
1049-
TableMetricsEntity entity = TableMetricsRegistry.getInstance().getMetricsEntity(tableId);
1050-
entity.counterRoutineLoadCommittedTasksTotal.increase(1L);
1051-
LOG.debug("routine load task committed. task id: {}, job id: {}", txnState.getLabel(), id);
1038+
// find task in job
1039+
Optional<RoutineLoadTaskInfo> routineLoadTaskInfoOptional = routineLoadTaskInfoList.stream().filter(
1040+
entity -> entity.getTxnId() == txnState.getTransactionId()).findFirst();
1041+
if (routineLoadTaskInfoOptional.isPresent()) {
1042+
RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get();
1043+
taskBeId = routineLoadTaskInfo.getBeId();
1044+
executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.COMMITTED, null);
1045+
routineLoadTaskInfo.afterCommitted(txnState);
1046+
}
1047+
++committedTaskNum;
1048+
TableMetricsEntity entity = TableMetricsRegistry.getInstance().getMetricsEntity(tableId);
1049+
entity.counterRoutineLoadCommittedTasksTotal.increase(1L);
1050+
LOG.debug("routine load task committed. task id: {}, job id: {}", txnState.getLabel(), id);
10521051

1053-
StreamLoadTask streamLoadTask = GlobalStateMgr.getCurrentState().getStreamLoadMgr().
1054-
getSyncSteamLoadTaskByTxnId(txnState.getTransactionId());
1055-
if (streamLoadTask != null) {
1056-
streamLoadTask.afterCommitted(txnState, txnOperated);
1057-
}
1052+
StreamLoadTask streamLoadTask = GlobalStateMgr.getCurrentState().getStreamLoadMgr().
1053+
getSyncSteamLoadTaskByTxnId(txnState.getTransactionId());
1054+
if (streamLoadTask != null) {
1055+
streamLoadTask.afterCommitted(txnState);
10581056
}
10591057
} catch (Throwable e) {
10601058
LOG.warn("after committed failed", e);

fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadTaskInfo.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,10 +247,10 @@ public void beginTxn() throws Exception {
247247
timeoutMs / 1000, computeResource);
248248
}
249249

250-
public void afterCommitted(TransactionState txnState, boolean txnOperated) throws StarRocksException {
250+
public void afterCommitted(TransactionState txnState) throws StarRocksException {
251251
// StreamLoadTask is null, if not specify session variable `enable_profile = true`
252252
if (streamLoadTask != null) {
253-
streamLoadTask.afterCommitted(txnState, txnOperated);
253+
streamLoadTask.afterCommitted(txnState);
254254
}
255255
}
256256

fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadMultiStmtTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -448,9 +448,9 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti
448448
}
449449

450450
@Override
451-
public void afterCommitted(TransactionState txnState, boolean txnOperated) throws StarRocksException {
451+
public void afterCommitted(TransactionState txnState) throws StarRocksException {
452452
for (StreamLoadTask task : taskMaps.values()) {
453-
task.afterCommitted(txnState, txnOperated);
453+
task.afterCommitted(txnState);
454454
}
455455
}
456456

fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadTask.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1180,11 +1180,7 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti
11801180
}
11811181

11821182
@Override
1183-
public void afterCommitted(TransactionState txnState, boolean txnOperated) throws StarRocksException {
1184-
if (!txnOperated) {
1185-
return;
1186-
}
1187-
1183+
public void afterCommitted(TransactionState txnState) throws StarRocksException {
11881184
// sync stream load collect profile, here we collect profile only when be has reported
11891185
if (isSyncStreamLoad() && coord != null && coord.isProfileAlreadyReported()) {
11901186
collectProfile(false);

fe/fe-core/src/main/java/com/starrocks/transaction/AbstractTxnStateChangeCallback.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public void beforeAborted(TransactionState txnState) throws TransactionException
3131
}
3232

3333
@Override
34-
public void afterCommitted(TransactionState txnState, boolean txnOperated) throws StarRocksException {
34+
public void afterCommitted(TransactionState txnState) throws StarRocksException {
3535

3636
}
3737

fe/fe-core/src/main/java/com/starrocks/transaction/DatabaseTransactionMgr.java

Lines changed: 45 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -490,69 +490,77 @@ public VisibleStateWaiter commitPreparedTransaction(long transactionId) throws S
490490

491491
Span txnSpan = transactionState.getTxnSpan();
492492
txnSpan.setAttribute("db", db.getFullName());
493-
StringBuilder tableListString = new StringBuilder();
494493
txnSpan.addEvent("commit_start");
495-
496-
for (Long tableId : transactionState.getTableIdList()) {
497-
Table table = globalStateMgr.getLocalMetastore().getTable(db.getId(), tableId);
498-
if (table == null) {
499-
// this can happen when tableId == -1 (tablet being dropping)
500-
// or table really not exist.
501-
continue;
502-
}
503-
if (tableListString.length() != 0) {
504-
tableListString.append(',');
505-
}
506-
tableListString.append(table.getName());
507-
}
508-
509-
txnSpan.setAttribute("tables", tableListString.toString());
494+
txnSpan.setAttribute("tables", buildTableListString(db, transactionState));
510495

511496
// before state transform
512497
transactionState.beforeStateTransform(TransactionStatus.COMMITTED);
498+
// COW
499+
TransactionState copiedState = new TransactionState(transactionState);
500+
Span unprotectedCommitSpan = TraceManager.startSpan("unprotectedCommitPreparedTransaction", txnSpan);
501+
513502
// transaction state transform
514-
boolean txnOperated = false;
503+
boolean txnOperated = unprotectedCommitPreparedTransaction(copiedState, db);
504+
if (!txnOperated) {
505+
return null;
506+
}
515507

516-
Span unprotectedCommitSpan = TraceManager.startSpan("unprotectedCommitPreparedTransaction", txnSpan);
508+
persistTxnStateInTxnLevelLock(copiedState);
517509

518510
writeLock();
519511
try {
520-
txnOperated = unprotectedCommitPreparedTransaction(transactionState, db);
512+
unprotectUpsertTransactionState(copiedState);
521513
} finally {
522514
writeUnlock();
523-
int numPartitions = 0;
524-
for (Map.Entry<Long, TableCommitInfo> entry : transactionState.getIdToTableCommitInfos().entrySet()) {
525-
numPartitions += entry.getValue().getIdToPartitionCommitInfo().size();
526-
}
527-
txnSpan.setAttribute("num_partition", numPartitions);
528-
unprotectedCommitSpan.end();
529-
// after state transform
530-
try {
531-
transactionState.afterStateTransform(TransactionStatus.COMMITTED, txnOperated, null);
532-
} catch (Throwable t) {
533-
LOG.warn("transaction after state transform failed: {}", transactionState, t);
534-
}
535515
}
536-
if (!txnOperated) {
537-
return null;
516+
txnSpan.setAttribute("num_partition", calculateNumPartitions(copiedState));
517+
unprotectedCommitSpan.end();
518+
// after state transform
519+
try {
520+
copiedState.afterStateTransform(TransactionStatus.COMMITTED, true, null);
521+
} catch (Throwable t) {
522+
LOG.warn("transaction after state transform failed: {}", transactionState, t);
538523
}
539524

540-
persistTxnStateInTxnLevelLock(transactionState);
541-
542525
// 6. update nextVersion because of the failure of persistent transaction resulting in error version
543526
Span updateCatalogAfterCommittedSpan = TraceManager.startSpan("updateCatalogAfterCommitted", txnSpan);
544527
try {
545-
updateCatalogAfterCommitted(transactionState, db);
528+
updateCatalogAfterCommitted(copiedState, db);
546529
} finally {
547530
updateCatalogAfterCommittedSpan.end();
548531
}
549-
LOG.info("transaction:[{}] successfully committed", transactionState);
532+
LOG.info("transaction:[{}] successfully committed", copiedState);
550533
return waiter;
551534
} finally {
552535
transactionState.writeUnlock();
553536
}
554537
}
555538

539+
private String buildTableListString(Database db, TransactionState transactionState) {
540+
StringBuilder tableListString = new StringBuilder();
541+
for (Long tableId : transactionState.getTableIdList()) {
542+
Table table = globalStateMgr.getLocalMetastore().getTable(db.getId(), tableId);
543+
if (table == null) {
544+
// this can happen when tableId == -1 (tablet being dropping)
545+
// or table really not exist.
546+
continue;
547+
}
548+
if (!tableListString.isEmpty()) {
549+
tableListString.append(',');
550+
}
551+
tableListString.append(table.getName());
552+
}
553+
return tableListString.toString();
554+
}
555+
556+
private int calculateNumPartitions(TransactionState transactionState) {
557+
int numPartitions = 0;
558+
for (Map.Entry<Long, TableCommitInfo> entry : transactionState.getIdToTableCommitInfos().entrySet()) {
559+
numPartitions += entry.getValue().getIdToPartitionCommitInfo().size();
560+
}
561+
return numPartitions;
562+
}
563+
556564
/**
557565
* Merge prepare and commit phases and automatically commit transactions
558566
*
@@ -1480,8 +1488,6 @@ protected boolean unprotectedCommitPreparedTransaction(TransactionState transact
14801488
}
14811489
}
14821490

1483-
// persist transactionState
1484-
unprotectUpsertTransactionState(transactionState);
14851491
return true;
14861492
}
14871493

0 commit comments

Comments
 (0)