From 9d489250c61a91e8cf2ff7d5c2edb7ced42fe6b0 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 7 Jan 2026 10:37:29 +0800 Subject: [PATCH 1/3] may-fix --- .../task/PipeTaskCoordinatorLock.java | 59 +++++-------------- 1 file changed, 15 insertions(+), 44 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java index e57add9a00140..d3413fb2cfbf7 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java @@ -22,10 +22,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.BlockingDeque; -import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; /** * {@link PipeTaskCoordinatorLock} is a cross thread lock for pipe task coordinator. It is used to @@ -35,46 +33,27 @@ public class PipeTaskCoordinatorLock { private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskCoordinatorLock.class); - private final BlockingDeque deque = new LinkedBlockingDeque<>(1); - private final AtomicLong idGenerator = new AtomicLong(0); + private final ReentrantLock lock = new ReentrantLock(); public void lock() { - try { - final long id = idGenerator.incrementAndGet(); - LOGGER.debug( - "PipeTaskCoordinator lock (id: {}) waiting for thread {}", - id, - Thread.currentThread().getName()); - deque.put(id); - LOGGER.debug( - "PipeTaskCoordinator lock (id: {}) acquired by thread {}", - id, - Thread.currentThread().getName()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOGGER.error( - "Interrupted while waiting for PipeTaskCoordinator lock, current thread: {}", - Thread.currentThread().getName()); - } + LOGGER.debug( + "PipeTaskCoordinator lock waiting for thread {}", Thread.currentThread().getName()); + lock.lock(); + LOGGER.debug( + "PipeTaskCoordinator lock acquired by thread {}", Thread.currentThread().getName()); } public boolean tryLock() { try { - final long id = idGenerator.incrementAndGet(); LOGGER.debug( - "PipeTaskCoordinator lock (id: {}) waiting for thread {}", - id, - Thread.currentThread().getName()); - if (deque.offer(id, 10, TimeUnit.SECONDS)) { + "PipeTaskCoordinator lock waiting for thread {}", Thread.currentThread().getName()); + if (lock.tryLock(10, TimeUnit.SECONDS)) { LOGGER.debug( - "PipeTaskCoordinator lock (id: {}) acquired by thread {}", - id, - Thread.currentThread().getName()); + "PipeTaskCoordinator lock acquired by thread {}", Thread.currentThread().getName()); return true; } else { LOGGER.info( - "PipeTaskCoordinator lock (id: {}) failed to acquire by thread {} because of timeout", - id, + "PipeTaskCoordinator lock failed to acquire by thread {} because of timeout", Thread.currentThread().getName()); return false; } @@ -88,20 +67,12 @@ public boolean tryLock() { } public void unlock() { - final Long id = deque.poll(); - if (id == null) { - LOGGER.error( - "PipeTaskCoordinator lock released by thread {} but the lock is not acquired by any thread", - Thread.currentThread().getName()); - } else { - LOGGER.debug( - "PipeTaskCoordinator lock (id: {}) released by thread {}", - id, - Thread.currentThread().getName()); - } + lock.unlock(); + LOGGER.debug( + "PipeTaskCoordinator lock released by thread {}", Thread.currentThread().getName()); } public boolean isLocked() { - return !deque.isEmpty(); + return lock.isLocked(); } } From 19750bd423aec303a16f1c249e45a33144d618ce Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 7 Jan 2026 11:14:59 +0800 Subject: [PATCH 2/3] refactor --- .../coordinator/task/PipeTaskCoordinator.java | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java index c68c6ef6fa58a..c0b81c2e43cbb 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java @@ -54,7 +54,6 @@ public class PipeTaskCoordinator { private final PipeTaskInfo pipeTaskInfo; private final PipeTaskCoordinatorLock pipeTaskCoordinatorLock; - private AtomicReference pipeTaskInfoHolder; public PipeTaskCoordinator(ConfigManager configManager, PipeTaskInfo pipeTaskInfo) { this.configManager = configManager; @@ -69,12 +68,7 @@ public PipeTaskCoordinator(ConfigManager configManager, PipeTaskInfo pipeTaskInf * null if the lock is not acquired. */ public AtomicReference tryLock() { - if (pipeTaskCoordinatorLock.tryLock()) { - pipeTaskInfoHolder = new AtomicReference<>(pipeTaskInfo); - return pipeTaskInfoHolder; - } - - return null; + return pipeTaskCoordinatorLock.tryLock() ? new AtomicReference<>(pipeTaskInfo) : null; } /** @@ -85,8 +79,7 @@ public AtomicReference tryLock() { */ public AtomicReference lock() { pipeTaskCoordinatorLock.lock(); - pipeTaskInfoHolder = new AtomicReference<>(pipeTaskInfo); - return pipeTaskInfoHolder; + return new AtomicReference<>(pipeTaskInfo); } /** @@ -97,11 +90,6 @@ public AtomicReference lock() { * the lock. */ public boolean unlock() { - if (pipeTaskInfoHolder != null) { - pipeTaskInfoHolder.set(null); - pipeTaskInfoHolder = null; - } - try { pipeTaskCoordinatorLock.unlock(); return true; From 494cfbdefaadc27f37cd2d183e7b911c55f1891e Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 7 Jan 2026 14:14:01 +0800 Subject: [PATCH 3/3] commit-sug --- .../coordinator/task/PipeTaskCoordinatorLock.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java index d3413fb2cfbf7..12b9261900491 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java @@ -38,9 +38,16 @@ public class PipeTaskCoordinatorLock { public void lock() { LOGGER.debug( "PipeTaskCoordinator lock waiting for thread {}", Thread.currentThread().getName()); - lock.lock(); - LOGGER.debug( - "PipeTaskCoordinator lock acquired by thread {}", Thread.currentThread().getName()); + try { + lock.lockInterruptibly(); + LOGGER.debug( + "PipeTaskCoordinator lock acquired by thread {}", Thread.currentThread().getName()); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.error( + "Interrupted while waiting for PipeTaskCoordinator lock, current thread: {}", + Thread.currentThread().getName()); + } } public boolean tryLock() {