diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java index c68c6ef6fa58..c0b81c2e43cb 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java @@ -54,7 +54,6 @@ public class PipeTaskCoordinator { private final PipeTaskInfo pipeTaskInfo; private final PipeTaskCoordinatorLock pipeTaskCoordinatorLock; - private AtomicReference pipeTaskInfoHolder; public PipeTaskCoordinator(ConfigManager configManager, PipeTaskInfo pipeTaskInfo) { this.configManager = configManager; @@ -69,12 +68,7 @@ public PipeTaskCoordinator(ConfigManager configManager, PipeTaskInfo pipeTaskInf * null if the lock is not acquired. */ public AtomicReference tryLock() { - if (pipeTaskCoordinatorLock.tryLock()) { - pipeTaskInfoHolder = new AtomicReference<>(pipeTaskInfo); - return pipeTaskInfoHolder; - } - - return null; + return pipeTaskCoordinatorLock.tryLock() ? new AtomicReference<>(pipeTaskInfo) : null; } /** @@ -85,8 +79,7 @@ public AtomicReference tryLock() { */ public AtomicReference lock() { pipeTaskCoordinatorLock.lock(); - pipeTaskInfoHolder = new AtomicReference<>(pipeTaskInfo); - return pipeTaskInfoHolder; + return new AtomicReference<>(pipeTaskInfo); } /** @@ -97,11 +90,6 @@ public AtomicReference lock() { * the lock. */ public boolean unlock() { - if (pipeTaskInfoHolder != null) { - pipeTaskInfoHolder.set(null); - pipeTaskInfoHolder = null; - } - try { pipeTaskCoordinatorLock.unlock(); return true; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java index e57add9a0014..12b926190049 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java @@ -22,10 +22,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.BlockingDeque; -import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; /** * {@link PipeTaskCoordinatorLock} is a cross thread lock for pipe task coordinator. It is used to @@ -35,22 +33,16 @@ public class PipeTaskCoordinatorLock { private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskCoordinatorLock.class); - private final BlockingDeque deque = new LinkedBlockingDeque<>(1); - private final AtomicLong idGenerator = new AtomicLong(0); + private final ReentrantLock lock = new ReentrantLock(); public void lock() { + LOGGER.debug( + "PipeTaskCoordinator lock waiting for thread {}", Thread.currentThread().getName()); try { - final long id = idGenerator.incrementAndGet(); + lock.lockInterruptibly(); LOGGER.debug( - "PipeTaskCoordinator lock (id: {}) waiting for thread {}", - id, - Thread.currentThread().getName()); - deque.put(id); - LOGGER.debug( - "PipeTaskCoordinator lock (id: {}) acquired by thread {}", - id, - Thread.currentThread().getName()); - } catch (InterruptedException e) { + "PipeTaskCoordinator lock acquired by thread {}", Thread.currentThread().getName()); + } catch (final InterruptedException e) { Thread.currentThread().interrupt(); LOGGER.error( "Interrupted while waiting for PipeTaskCoordinator lock, current thread: {}", @@ -60,21 +52,15 @@ public void lock() { public boolean tryLock() { try { - final long id = idGenerator.incrementAndGet(); LOGGER.debug( - "PipeTaskCoordinator lock (id: {}) waiting for thread {}", - id, - Thread.currentThread().getName()); - if (deque.offer(id, 10, TimeUnit.SECONDS)) { + "PipeTaskCoordinator lock waiting for thread {}", Thread.currentThread().getName()); + if (lock.tryLock(10, TimeUnit.SECONDS)) { LOGGER.debug( - "PipeTaskCoordinator lock (id: {}) acquired by thread {}", - id, - Thread.currentThread().getName()); + "PipeTaskCoordinator lock acquired by thread {}", Thread.currentThread().getName()); return true; } else { LOGGER.info( - "PipeTaskCoordinator lock (id: {}) failed to acquire by thread {} because of timeout", - id, + "PipeTaskCoordinator lock failed to acquire by thread {} because of timeout", Thread.currentThread().getName()); return false; } @@ -88,20 +74,12 @@ public boolean tryLock() { } public void unlock() { - final Long id = deque.poll(); - if (id == null) { - LOGGER.error( - "PipeTaskCoordinator lock released by thread {} but the lock is not acquired by any thread", - Thread.currentThread().getName()); - } else { - LOGGER.debug( - "PipeTaskCoordinator lock (id: {}) released by thread {}", - id, - Thread.currentThread().getName()); - } + lock.unlock(); + LOGGER.debug( + "PipeTaskCoordinator lock released by thread {}", Thread.currentThread().getName()); } public boolean isLocked() { - return !deque.isEmpty(); + return lock.isLocked(); } }