Skip to content

Commit 19834e4

Browse files
authored
Pipe: Fixed the concurrency bug of stop / start (apache#16461)
1 parent db819be commit 19834e4

File tree

3 files changed

+12
-6
lines changed

3 files changed

+12
-6
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -225,12 +225,11 @@ public void report(PipeTaskMeta pipeTaskMeta, PipeRuntimeException pipeRuntimeEx
225225
pipeRuntimeException.getMessage(),
226226
pipeRuntimeException);
227227

228-
pipeTaskMeta.trackExceptionMessage(pipeRuntimeException);
229-
230228
// Quick stop all pipes locally if critical exception occurs,
231229
// no need to wait for the next heartbeat cycle.
232230
if (pipeRuntimeException instanceof PipeRuntimeCriticalException) {
233-
PipeDataNodeAgent.task().stopAllPipesWithCriticalException();
231+
PipeDataNodeAgent.task()
232+
.stopAllPipesWithCriticalExceptionAndTrackException(pipeTaskMeta, pipeRuntimeException);
234233
}
235234
}
236235

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
3131
import org.apache.iotdb.commons.consensus.index.impl.MetaProgressIndex;
3232
import org.apache.iotdb.commons.exception.IllegalPathException;
33+
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
3334
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
3435
import org.apache.iotdb.commons.pipe.agent.task.PipeTask;
3536
import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
@@ -373,8 +374,10 @@ protected boolean dropPipe(final String pipeName) {
373374
return true;
374375
}
375376

376-
public void stopAllPipesWithCriticalException() {
377-
super.stopAllPipesWithCriticalException(CONFIG.getDataNodeId());
377+
public void stopAllPipesWithCriticalExceptionAndTrackException(
378+
final PipeTaskMeta pipeTaskMeta, final PipeRuntimeException pipeRuntimeException) {
379+
super.stopAllPipesWithCriticalException(
380+
CONFIG.getDataNodeId(), pipeTaskMeta, pipeRuntimeException);
378381
}
379382

380383
///////////////////////// Heartbeat /////////////////////////

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -951,7 +951,10 @@ private void startPipeTask(final int consensusGroupId, final PipeStaticMeta pipe
951951
* Using try lock method to prevent deadlock when stopping all pipes with critical exceptions and
952952
* {@link PipeTaskAgent#handlePipeMetaChanges(List)}} concurrently.
953953
*/
954-
protected void stopAllPipesWithCriticalException(final int currentNodeId) {
954+
protected void stopAllPipesWithCriticalException(
955+
final int currentNodeId,
956+
final PipeTaskMeta pipeTaskMeta,
957+
final PipeRuntimeException pipeRuntimeException) {
955958
// To avoid deadlock, we use a new thread to stop all pipes.
956959
CompletableFuture.runAsync(
957960
() -> {
@@ -960,6 +963,7 @@ protected void stopAllPipesWithCriticalException(final int currentNodeId) {
960963
while (true) {
961964
if (tryWriteLockWithTimeOut(5)) {
962965
try {
966+
pipeTaskMeta.trackExceptionMessage(pipeRuntimeException);
963967
stopAllPipesWithCriticalExceptionInternal(currentNodeId);
964968
LOGGER.info("Stopped all pipes with critical exception.");
965969
return;

0 commit comments

Comments
 (0)