Skip to content

Commit 01e0364

Browse files
committed
add
Signed-off-by: gengjun-git <gengjun@starrocks.com>
1 parent 1851e00 commit 01e0364

32 files changed

+603
-468
lines changed

fe/fe-core/src/main/java/com/starrocks/load/DeleteJob.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,11 +116,7 @@ public void setDeleteConditions(List<Predicate> deleteConditions) {
116116
}
117117

118118
@Override
119-
public void afterVisible(TransactionState txnState, boolean txnOperated) {
120-
if (!txnOperated) {
121-
return;
122-
}
123-
119+
public void afterVisible(TransactionState txnState) {
124120
GlobalStateMgr.getCurrentState().getEditLog().logFinishMultiDelete(deleteInfo, wal -> {
125121
setState(DeleteState.FINISHED);
126122
GlobalStateMgr.getCurrentState().getDeleteMgr().recordFinishedJob(this);
@@ -129,7 +125,7 @@ public void afterVisible(TransactionState txnState, boolean txnOperated) {
129125
}
130126

131127
@Override
132-
public void afterAborted(TransactionState txnState, boolean txnOperated, String txnStatusChangeReason) {
128+
public void afterAborted(TransactionState txnState, String txnStatusChangeReason) {
133129
// just to clean the callback
134130
GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().getCallbackFactory().removeCallback(getId());
135131
}

fe/fe-core/src/main/java/com/starrocks/load/batchwrite/MergeCommitTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -795,7 +795,7 @@ public long getId() {
795795
}
796796

797797
@Override
798-
public void afterAborted(TransactionState txnState, boolean txnOperated, String txnStatusChangeReason)
798+
public void afterAborted(TransactionState txnState, String txnStatusChangeReason)
799799
throws StarRocksException {
800800
// This transaction abort must come from outside, because run() removes the callback before abort txn.
801801
cancel(txnStatusChangeReason);

fe/fe-core/src/main/java/com/starrocks/load/loadv2/BrokerLoadJob.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -344,10 +344,7 @@ private void createLoadingTask(Database db, BrokerPendingTaskAttachment attachme
344344
}
345345

346346
@Override
347-
public void afterAborted(TransactionState txnState, boolean txnOperated, String txnStatusChangeReason) {
348-
if (!txnOperated) {
349-
return;
350-
}
347+
public void afterAborted(TransactionState txnState, String txnStatusChangeReason) {
351348
writeLock();
352349
try {
353350
// check if job has been completed
@@ -388,8 +385,8 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String
388385
}
389386

390387
@Override
391-
public void afterVisible(TransactionState txnState, boolean txnOperated) {
392-
super.afterVisible(txnState, txnOperated);
388+
public void afterVisible(TransactionState txnState) {
389+
super.afterVisible(txnState);
393390
WarehouseIdleChecker.updateJobLastFinishTime(warehouseId, "BrokerLoad: jobId[" + id + "] label[" + label + "]");
394391
}
395392

fe/fe-core/src/main/java/com/starrocks/load/loadv2/IVMInsertLoadTxnCallback.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti
9494
}
9595

9696
@Override
97-
public void afterCommitted(TransactionState txnState, boolean txnOperated) throws StarRocksException {
97+
public void afterCommitted(TransactionState txnState) throws StarRocksException {
9898
if (CollectionUtils.sizeIsEmpty(this.baseTableInfoTvrDeltaMap)) {
9999
LOG.info("Materialized view {} has no base table info tvr version range to update, skip", mv.getName());
100100
return;

fe/fe-core/src/main/java/com/starrocks/load/loadv2/InsertLoadJob.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -270,13 +270,10 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti
270270
}
271271

272272
@Override
273-
public void afterCommitted(TransactionState txnState, boolean txnOperated) throws StarRocksException {
274-
if (!txnOperated) {
275-
return;
276-
}
273+
public void afterCommitted(TransactionState txnState) throws StarRocksException {
277274
loadCommittedTimestamp = System.currentTimeMillis();
278275
if (txnCallback != null) {
279-
txnCallback.afterCommitted(txnState, txnOperated);
276+
txnCallback.afterCommitted(txnState);
280277
}
281278
}
282279

@@ -285,15 +282,15 @@ public void replayOnCommitted(TransactionState txnState) {
285282
}
286283

287284
@Override
288-
public void afterAborted(TransactionState txnState, boolean txnOperated, String txnStatusChangeReason) {
285+
public void afterAborted(TransactionState txnState, String txnStatusChangeReason) {
289286
}
290287

291288
@Override
292289
public void replayOnAborted(TransactionState txnState) {
293290
}
294291

295292
@Override
296-
public void afterVisible(TransactionState txnState, boolean txnOperated) {
293+
public void afterVisible(TransactionState txnState) {
297294
}
298295

299296
@Override

fe/fe-core/src/main/java/com/starrocks/load/loadv2/InsertLoadTxnCallback.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,5 @@ public interface InsertLoadTxnCallback {
3333
/**
3434
* After transaction is committed, do some follow-up work.
3535
*/
36-
void afterCommitted(TransactionState txnState, boolean txnOperated) throws StarRocksException;
36+
void afterCommitted(TransactionState txnState) throws StarRocksException;
3737
}

fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadJob.java

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1158,10 +1158,7 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti
11581158
}
11591159

11601160
@Override
1161-
public void afterCommitted(TransactionState txnState, boolean txnOperated) throws StarRocksException {
1162-
if (!txnOperated) {
1163-
return;
1164-
}
1161+
public void afterCommitted(TransactionState txnState) throws StarRocksException {
11651162
writeLock();
11661163
try {
11671164
unprotectUpdateLoadingStatus(txnState);
@@ -1192,14 +1189,10 @@ public void replayOnCommitted(TransactionState txnState) {
11921189
* The job will be cancelled by replayOnAborted when journal replay
11931190
*
11941191
* @param txnState
1195-
* @param txnOperated
11961192
* @param txnStatusChangeReason
11971193
*/
11981194
@Override
1199-
public void afterAborted(TransactionState txnState, boolean txnOperated, String txnStatusChangeReason) {
1200-
if (!txnOperated) {
1201-
return;
1202-
}
1195+
public void afterAborted(TransactionState txnState, String txnStatusChangeReason) {
12031196
writeLock();
12041197
try {
12051198
if (isTxnDone()) {
@@ -1239,13 +1232,9 @@ public void replayOnAborted(TransactionState txnState) {
12391232
* The job will be finished by replayOnVisible when txn journal replay
12401233
*
12411234
* @param txnState
1242-
* @param txnOperated
12431235
*/
12441236
@Override
1245-
public void afterVisible(TransactionState txnState, boolean txnOperated) {
1246-
if (!txnOperated) {
1247-
return;
1248-
}
1237+
public void afterVisible(TransactionState txnState) {
12491238
GlobalStateMgr.getCurrentState().getOperationListenerBus().onLoadJobTransactionFinish(txnState);
12501239
unprotectUpdateLoadingStatus(txnState);
12511240
updateState(JobState.FINISHED);

fe/fe-core/src/main/java/com/starrocks/load/loadv2/SparkLoadJob.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -841,8 +841,8 @@ private void clearJob() {
841841
}
842842

843843
@Override
844-
public void afterVisible(TransactionState txnState, boolean txnOperated) {
845-
super.afterVisible(txnState, txnOperated);
844+
public void afterVisible(TransactionState txnState) {
845+
super.afterVisible(txnState);
846846
// collect table-level metrics after spark load job finished
847847
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(dbId);
848848
if (null == db) {
@@ -866,8 +866,8 @@ public void afterVisible(TransactionState txnState, boolean txnOperated) {
866866
}
867867

868868
@Override
869-
public void afterAborted(TransactionState txnState, boolean txnOperated, String txnStatusChangeReason) {
870-
super.afterAborted(txnState, txnOperated, txnStatusChangeReason);
869+
public void afterAborted(TransactionState txnState, String txnStatusChangeReason) {
870+
super.afterAborted(txnState, txnStatusChangeReason);
871871
WarehouseIdleChecker.updateJobLastFinishTime(warehouseId, "SparkLoad: id[" + id + "] label[" + label + "]");
872872
}
873873

fe/fe-core/src/main/java/com/starrocks/load/routineload/KafkaRoutineLoadJob.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -872,8 +872,8 @@ public void updateSubstate() throws StarRocksException {
872872
}
873873

874874
@Override
875-
public void afterVisible(TransactionState txnState, boolean txnOperated) {
876-
super.afterVisible(txnState, txnOperated);
875+
public void afterVisible(TransactionState txnState) {
876+
super.afterVisible(txnState);
877877
// Update lag time metrics when Kafka transaction becomes visible
878878
if (Config.enable_routine_load_lag_time_metrics) {
879879
updateLagTimeMetricsFromProgress();

fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadJob.java

Lines changed: 70 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,29 +1032,27 @@ private void executeBeforeCheck(TransactionState txnState, TransactionStatus tra
10321032
// paused job or renew task
10331033
// *** Please do not call after individually. It must be combined use with before ***
10341034
@Override
1035-
public void afterCommitted(TransactionState txnState, boolean txnOperated) throws StarRocksException {
1035+
public void afterCommitted(TransactionState txnState) throws StarRocksException {
10361036
long taskBeId = -1L;
10371037
try {
1038-
if (txnOperated) {
1039-
// find task in job
1040-
Optional<RoutineLoadTaskInfo> routineLoadTaskInfoOptional = routineLoadTaskInfoList.stream().filter(
1041-
entity -> entity.getTxnId() == txnState.getTransactionId()).findFirst();
1042-
if (routineLoadTaskInfoOptional.isPresent()) {
1043-
RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get();
1044-
taskBeId = routineLoadTaskInfo.getBeId();
1045-
executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.COMMITTED, null);
1046-
routineLoadTaskInfo.afterCommitted(txnState, txnOperated);
1047-
}
1048-
++committedTaskNum;
1049-
TableMetricsEntity entity = TableMetricsRegistry.getInstance().getMetricsEntity(tableId);
1050-
entity.counterRoutineLoadCommittedTasksTotal.increase(1L);
1051-
LOG.debug("routine load task committed. task id: {}, job id: {}", txnState.getLabel(), id);
1052-
1053-
StreamLoadTask streamLoadTask = GlobalStateMgr.getCurrentState().getStreamLoadMgr().
1054-
getSyncSteamLoadTaskByTxnId(txnState.getTransactionId());
1055-
if (streamLoadTask != null) {
1056-
streamLoadTask.afterCommitted(txnState, txnOperated);
1057-
}
1038+
// find task in job
1039+
Optional<RoutineLoadTaskInfo> routineLoadTaskInfoOptional = routineLoadTaskInfoList.stream().filter(
1040+
entity -> entity.getTxnId() == txnState.getTransactionId()).findFirst();
1041+
if (routineLoadTaskInfoOptional.isPresent()) {
1042+
RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get();
1043+
taskBeId = routineLoadTaskInfo.getBeId();
1044+
executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.COMMITTED, null);
1045+
routineLoadTaskInfo.afterCommitted(txnState);
1046+
}
1047+
++committedTaskNum;
1048+
TableMetricsEntity entity = TableMetricsRegistry.getInstance().getMetricsEntity(tableId);
1049+
entity.counterRoutineLoadCommittedTasksTotal.increase(1L);
1050+
LOG.debug("routine load task committed. task id: {}, job id: {}", txnState.getLabel(), id);
1051+
1052+
StreamLoadTask streamLoadTask = GlobalStateMgr.getCurrentState().getStreamLoadMgr().
1053+
getSyncSteamLoadTaskByTxnId(txnState.getTransactionId());
1054+
if (streamLoadTask != null) {
1055+
streamLoadTask.afterCommitted(txnState);
10581056
}
10591057
} catch (Throwable e) {
10601058
LOG.warn("after committed failed", e);
@@ -1086,19 +1084,7 @@ public void replayOnCommitted(TransactionState txnState) {
10861084
* the corresponding txn is visible, create a new task
10871085
*/
10881086
@Override
1089-
public void afterVisible(TransactionState txnState, boolean txnOperated) {
1090-
if (!txnOperated) {
1091-
String msg = String.format(
1092-
"should not happen, we find that txnOperated if false when handling afterVisble. job id: %d, txn_id: %d",
1093-
id, txnState.getTransactionId());
1094-
LOG.warn(msg);
1095-
// print a log and return.
1096-
// if this really happen, the job will be blocked, and this task can be seen by
1097-
// "show routine load task" stmt, which is in COMMITTED state for a long time.
1098-
// so we can find this error and step in.
1099-
return;
1100-
}
1101-
1087+
public void afterVisible(TransactionState txnState) {
11021088
writeLock();
11031089
try {
11041090
if (state != JobState.RUNNING) {
@@ -1109,7 +1095,7 @@ public void afterVisible(TransactionState txnState, boolean txnOperated) {
11091095
StreamLoadTask streamLoadTask = GlobalStateMgr.getCurrentState().getStreamLoadMgr().
11101096
getSyncSteamLoadTaskByTxnId(txnState.getTransactionId());
11111097
if (streamLoadTask != null) {
1112-
streamLoadTask.afterVisible(txnState, txnOperated);
1098+
streamLoadTask.afterVisible(txnState);
11131099
}
11141100

11151101
Optional<RoutineLoadTaskInfo> routineLoadTaskInfoOptional = routineLoadTaskInfoList.stream().filter(
@@ -1144,7 +1130,7 @@ public void afterVisible(TransactionState txnState, boolean txnOperated) {
11441130
}
11451131

11461132
try {
1147-
routineLoadTaskInfo.afterVisible(txnState, txnOperated);
1133+
routineLoadTaskInfo.afterVisible(txnState);
11481134
} catch (StarRocksException e) {
11491135
LOG.warn("failed to execute 'routineLoadTaskInfo.afterVisible', txnId {}, label {}. " +
11501136
"this should not happen", txnState.getTransactionId(), routineLoadTaskInfo.getLabel());
@@ -1177,65 +1163,63 @@ public void afterVisible(TransactionState txnState, boolean txnOperated) {
11771163
// progress will be update otherwise the progress will be hung
11781164
// *** Please do not call after individually. It must be combined use with before ***
11791165
@Override
1180-
public void afterAborted(TransactionState txnState, boolean txnOperated, String txnStatusChangeReasonString)
1166+
public void afterAborted(TransactionState txnState, String txnStatusChangeReasonString)
11811167
throws StarRocksException {
11821168
long taskBeId = -1L;
11831169
try {
1184-
if (txnOperated) {
1185-
StreamLoadTask streamLoadTask = GlobalStateMgr.getCurrentState().getStreamLoadMgr().
1186-
getSyncSteamLoadTaskByTxnId(txnState.getTransactionId());
1187-
if (streamLoadTask != null) {
1188-
streamLoadTask.afterAborted(txnState, txnOperated, txnStatusChangeReasonString);
1189-
}
1170+
StreamLoadTask streamLoadTask = GlobalStateMgr.getCurrentState().getStreamLoadMgr().
1171+
getSyncSteamLoadTaskByTxnId(txnState.getTransactionId());
1172+
if (streamLoadTask != null) {
1173+
streamLoadTask.afterAborted(txnState, txnStatusChangeReasonString);
1174+
}
11901175

1191-
// step0: find task in job
1192-
Optional<RoutineLoadTaskInfo> routineLoadTaskInfoOptional = routineLoadTaskInfoList.stream().filter(
1193-
entity -> entity.getTxnId() == txnState.getTransactionId()).findFirst();
1194-
TableMetricsEntity entity = TableMetricsRegistry.getInstance().getMetricsEntity(tableId);
1195-
if (!routineLoadTaskInfoOptional.isPresent()) {
1196-
// The task of the timed-out transaction will be detected by the transaction checker thread
1197-
// and subsequently aborted. Here, we need to update the abortedTaskNum.
1198-
++abortedTaskNum;
1199-
entity.counterRoutineLoadAbortedTasksTotal.increase(1L);
1200-
// task will not be update when task has been aborted by fe
1201-
return;
1202-
}
1203-
RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get();
1204-
taskBeId = routineLoadTaskInfo.getBeId();
1205-
// step1: job state will be changed depending on txnStatusChangeReasonString
1206-
if (LOG.isDebugEnabled()) {
1207-
LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, txnState.getLabel())
1208-
.add("txn_id", txnState.getTransactionId())
1209-
.add("msg", "txn abort with reason " + txnStatusChangeReasonString)
1210-
.build());
1211-
}
1212-
routineLoadTaskInfo.afterAborted(txnState, txnOperated, txnStatusChangeReasonString);
1176+
// step0: find task in job
1177+
Optional<RoutineLoadTaskInfo> routineLoadTaskInfoOptional = routineLoadTaskInfoList.stream().filter(
1178+
entity -> entity.getTxnId() == txnState.getTransactionId()).findFirst();
1179+
TableMetricsEntity entity = TableMetricsRegistry.getInstance().getMetricsEntity(tableId);
1180+
if (!routineLoadTaskInfoOptional.isPresent()) {
1181+
// The task of the timed-out transaction will be detected by the transaction checker thread
1182+
// and subsequently aborted. Here, we need to update the abortedTaskNum.
12131183
++abortedTaskNum;
12141184
entity.counterRoutineLoadAbortedTasksTotal.increase(1L);
1215-
setOtherMsg(txnStatusChangeReasonString);
1216-
TxnStatusChangeReason txnStatusChangeReason = null;
1217-
if (txnStatusChangeReasonString != null) {
1218-
txnStatusChangeReason =
1219-
TxnStatusChangeReason.fromString(txnStatusChangeReasonString);
1220-
if (txnStatusChangeReason != null) {
1221-
switch (txnStatusChangeReason) {
1222-
case OFFSET_OUT_OF_RANGE:
1223-
case PAUSE:
1224-
String msg = "be " + taskBeId + " abort task "
1225-
+ "with reason: " + txnStatusChangeReasonString;
1226-
updateState(JobState.PAUSED,
1227-
new ErrorReason(InternalErrorCode.TASKS_ABORT_ERR, msg));
1228-
return;
1229-
default:
1230-
break;
1231-
}
1185+
// task will not be update when task has been aborted by fe
1186+
return;
1187+
}
1188+
RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get();
1189+
taskBeId = routineLoadTaskInfo.getBeId();
1190+
// step1: job state will be changed depending on txnStatusChangeReasonString
1191+
if (LOG.isDebugEnabled()) {
1192+
LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, txnState.getLabel())
1193+
.add("txn_id", txnState.getTransactionId())
1194+
.add("msg", "txn abort with reason " + txnStatusChangeReasonString)
1195+
.build());
1196+
}
1197+
routineLoadTaskInfo.afterAborted(txnState, txnStatusChangeReasonString);
1198+
++abortedTaskNum;
1199+
entity.counterRoutineLoadAbortedTasksTotal.increase(1L);
1200+
setOtherMsg(txnStatusChangeReasonString);
1201+
TxnStatusChangeReason txnStatusChangeReason = null;
1202+
if (txnStatusChangeReasonString != null) {
1203+
txnStatusChangeReason =
1204+
TxnStatusChangeReason.fromString(txnStatusChangeReasonString);
1205+
if (txnStatusChangeReason != null) {
1206+
switch (txnStatusChangeReason) {
1207+
case OFFSET_OUT_OF_RANGE:
1208+
case PAUSE:
1209+
String msg = "be " + taskBeId + " abort task "
1210+
+ "with reason: " + txnStatusChangeReasonString;
1211+
updateState(JobState.PAUSED,
1212+
new ErrorReason(InternalErrorCode.TASKS_ABORT_ERR, msg));
1213+
return;
1214+
default:
1215+
break;
12321216
}
1233-
// TODO(ml): use previous be id depend on change reason
12341217
}
1235-
// step2: commit task , update progress, maybe create a new task
1236-
executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.ABORTED,
1237-
txnStatusChangeReasonString);
1218+
// TODO(ml): use previous be id depend on change reason
12381219
}
1220+
// step2: commit task , update progress, maybe create a new task
1221+
executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.ABORTED,
1222+
txnStatusChangeReasonString);
12391223
} catch (Exception e) {
12401224
String msg =
12411225
"be " + taskBeId + " abort task " + txnState.getLabel() + " failed with error " + e.getMessage();

0 commit comments

Comments
 (0)