Skip to content

Commit 98d5c55

Browse files
committed
[fix](delete) Delete should count down latch and clear an agent task when failed (apache#57428)
1 parent be1b122 commit 98d5c55

File tree

3 files changed

+42
-10
lines changed

3 files changed

+42
-10
lines changed

fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.doris.cloud.master.CloudReportHandler;
3434
import org.apache.doris.common.Config;
3535
import org.apache.doris.common.MetaNotFoundException;
36+
import org.apache.doris.common.Status;
3637
import org.apache.doris.load.DeleteJob;
3738
import org.apache.doris.load.loadv2.IngestionLoadJob;
3839
import org.apache.doris.system.Backend;
@@ -319,18 +320,31 @@ private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) thro
319320
long backendId = pushTask.getBackendId();
320321
long signature = task.getSignature();
321322
long transactionId = ((PushTask) task).getTransactionId();
323+
long tableId = pushTask.getTableId();
324+
long partitionId = pushTask.getPartitionId();
325+
long pushIndexId = pushTask.getIndexId();
326+
long pushTabletId = pushTask.getTabletId();
322327

323328
if (request.getTaskStatus().getStatusCode() != TStatusCode.OK) {
324329
if (pushTask.getPushType() == TPushType.DELETE) {
325330
// we don't need to retry if the returned status code is DELETE_INVALID_CONDITION
326331
// or DELETE_INVALID_PARAMETERS
327332
// note that they will be converted to TStatusCode.INVALID_ARGUMENT when being sent from be to fe
328-
if (request.getTaskStatus().getStatusCode() == TStatusCode.INVALID_ARGUMENT) {
329-
pushTask.countDownToZero(request.getTaskStatus().getStatusCode(),
330-
task.getBackendId() + ": " + request.getTaskStatus().getErrorMsgs().toString());
331-
AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature);
332-
LOG.warn("finish push replica error: {}", request.getTaskStatus().getErrorMsgs().toString());
333+
TStatus taskStatus = request.getTaskStatus();
334+
String msg = task.getBackendId() + ": " + taskStatus.getErrorMsgs().toString();
335+
LOG.warn("finish push replica, signature={}, error: {}",
336+
signature, taskStatus.getErrorMsgs().toString());
337+
if (taskStatus.getStatusCode() == TStatusCode.OBTAIN_LOCK_FAILED) {
338+
// retry if obtain lock failed
339+
return;
340+
}
341+
if (taskStatus.getStatusCode() == TStatusCode.INVALID_ARGUMENT) {
342+
pushTask.countDownToZero(taskStatus.getStatusCode(), msg);
343+
} else {
344+
pushTask.countDownLatchWithStatus(backendId, pushTabletId,
345+
new Status(taskStatus.getStatusCode(), msg));
333346
}
347+
AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature);
334348
}
335349
return;
336350
}
@@ -344,10 +358,6 @@ private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) thro
344358
return;
345359
}
346360

347-
long tableId = pushTask.getTableId();
348-
long partitionId = pushTask.getPartitionId();
349-
long pushIndexId = pushTask.getIndexId();
350-
long pushTabletId = pushTask.getTabletId();
351361
// push finish type:
352362
// numOfFinishTabletInfos tabletId schemaHash
353363
// Normal: 1 / /
@@ -445,7 +455,7 @@ private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) thro
445455
AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature);
446456
LOG.warn("finish push replica error", e);
447457
if (pushTask.getPushType() == TPushType.DELETE) {
448-
pushTask.countDownLatch(backendId, pushTabletId);
458+
pushTask.countDownLatchWithStatus(backendId, pushTabletId, Status.CANCELLED);
449459
}
450460
} finally {
451461
olapTable.writeUnlock();

fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,18 @@ public void countDownLatch(long backendId, long tabletId) {
220220
}
221221
}
222222

223+
public void countDownLatchWithStatus(long backendId, long tabletId, Status st) {
224+
if (this.latch == null) {
225+
return;
226+
}
227+
if (latch.markedCountDownWithStatus(backendId, tabletId, st)) {
228+
if (LOG.isDebugEnabled()) {
229+
LOG.debug("pushTask current latch count with status: {}. backend: {}, tablet:{}, st::{}",
230+
latch.getCount(), backendId, tabletId, st);
231+
}
232+
}
233+
}
234+
223235
// call this always means one of tasks is failed. count down to zero to finish entire task
224236
public void countDownToZero(TStatusCode code, String errMsg) {
225237
if (this.latch != null) {

regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,16 @@ suite("test_delete_from_timeout","nonConcurrent") {
4646

4747
GetDebugPoint().clearDebugPointsForAllBEs()
4848

49+
GetDebugPoint().enableDebugPointForAllBEs("DeleteHandler::generate_delete_predicate.inject_failure",
50+
[error_code: -235 /* TOO MANY VERSIONS */, error_msg: "too many versions"])
51+
52+
test {
53+
sql """delete from ${tableName} where col1 = "false" and col2 = "-9999782574499444.2" and col3 = "-25"; """
54+
exception "too many versions"
55+
}
56+
57+
GetDebugPoint().clearDebugPointsForAllBEs()
58+
4959
GetDebugPoint().enableDebugPointForAllBEs("PushHandler::_do_streaming_ingestion.try_lock_fail")
5060

5161
def t1 = Thread.start {

0 commit comments

Comments
 (0)