@@ -342,6 +342,10 @@ private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) thro
342342 long backendId = pushTask .getBackendId ();
343343 long signature = task .getSignature ();
344344 long transactionId = ((PushTask ) task ).getTransactionId ();
345+ long tableId = pushTask .getTableId ();
346+ long partitionId = pushTask .getPartitionId ();
347+ long pushIndexId = pushTask .getIndexId ();
348+ long pushTabletId = pushTask .getTabletId ();
345349
346350 if (request .getTaskStatus ().getStatusCode () != TStatusCode .OK ) {
347351 if (pushTask .getPushType () == TPushType .DELETE ) {
@@ -351,9 +355,12 @@ private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) thro
351355 if (request .getTaskStatus ().getStatusCode () == TStatusCode .INVALID_ARGUMENT ) {
352356 pushTask .countDownToZero (request .getTaskStatus ().getStatusCode (),
353357 task .getBackendId () + ": " + request .getTaskStatus ().getErrorMsgs ().toString ());
354- AgentTaskQueue . removeTask ( backendId , TTaskType . REALTIME_PUSH , signature );
355- LOG . warn ( "finish push replica error: {}" , request . getTaskStatus (). getErrorMsgs (). toString () );
358+ } else {
359+ pushTask . countDownLatch ( backendId , pushTabletId );
356360 }
361+ AgentTaskQueue .removeTask (backendId , TTaskType .REALTIME_PUSH , signature );
362+ LOG .warn ("finish push replica, signature={}, error: {}" ,
363+ signature , request .getTaskStatus ().getErrorMsgs ().toString ());
357364 }
358365 return ;
359366 }
@@ -367,10 +374,6 @@ private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) thro
367374 return ;
368375 }
369376
370- long tableId = pushTask .getTableId ();
371- long partitionId = pushTask .getPartitionId ();
372- long pushIndexId = pushTask .getIndexId ();
373- long pushTabletId = pushTask .getTabletId ();
374377 // push finish type:
375378 // numOfFinishTabletInfos tabletId schemaHash
376379 // Normal: 1 / /
0 commit comments