Skip to content

Commit dd4cf68

Browse files
committed
Pipe: Refactored the PipeTaskCoordinatorLock (#16988)
* may-fix * refactor * commit-sug
1 parent f9631fa commit dd4cf68

File tree

2 files changed

+17
-51
lines changed

2 files changed

+17
-51
lines changed

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ public class PipeTaskCoordinator {
5151
private final PipeTaskInfo pipeTaskInfo;
5252

5353
private final PipeTaskCoordinatorLock pipeTaskCoordinatorLock;
54-
private AtomicReference<PipeTaskInfo> pipeTaskInfoHolder;
5554

5655
public PipeTaskCoordinator(ConfigManager configManager, PipeTaskInfo pipeTaskInfo) {
5756
this.configManager = configManager;
@@ -66,12 +65,7 @@ public PipeTaskCoordinator(ConfigManager configManager, PipeTaskInfo pipeTaskInf
6665
* null if the lock is not acquired.
6766
*/
6867
public AtomicReference<PipeTaskInfo> tryLock() {
69-
if (pipeTaskCoordinatorLock.tryLock()) {
70-
pipeTaskInfoHolder = new AtomicReference<>(pipeTaskInfo);
71-
return pipeTaskInfoHolder;
72-
}
73-
74-
return null;
68+
return pipeTaskCoordinatorLock.tryLock() ? new AtomicReference<>(pipeTaskInfo) : null;
7569
}
7670

7771
/**
@@ -82,8 +76,7 @@ public AtomicReference<PipeTaskInfo> tryLock() {
8276
*/
8377
public AtomicReference<PipeTaskInfo> lock() {
8478
pipeTaskCoordinatorLock.lock();
85-
pipeTaskInfoHolder = new AtomicReference<>(pipeTaskInfo);
86-
return pipeTaskInfoHolder;
79+
return new AtomicReference<>(pipeTaskInfo);
8780
}
8881

8982
/**
@@ -94,11 +87,6 @@ public AtomicReference<PipeTaskInfo> lock() {
9487
* the lock.
9588
*/
9689
public boolean unlock() {
97-
if (pipeTaskInfoHolder != null) {
98-
pipeTaskInfoHolder.set(null);
99-
pipeTaskInfoHolder = null;
100-
}
101-
10290
try {
10391
pipeTaskCoordinatorLock.unlock();
10492
return true;

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java

Lines changed: 15 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,8 @@
2222
import org.slf4j.Logger;
2323
import org.slf4j.LoggerFactory;
2424

25-
import java.util.concurrent.BlockingDeque;
26-
import java.util.concurrent.LinkedBlockingDeque;
2725
import java.util.concurrent.TimeUnit;
28-
import java.util.concurrent.atomic.AtomicLong;
26+
import java.util.concurrent.locks.ReentrantLock;
2927

3028
/**
3129
* {@link PipeTaskCoordinatorLock} is a cross thread lock for pipe task coordinator. It is used to
@@ -35,22 +33,16 @@ public class PipeTaskCoordinatorLock {
3533

3634
private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskCoordinatorLock.class);
3735

38-
private final BlockingDeque<Long> deque = new LinkedBlockingDeque<>(1);
39-
private final AtomicLong idGenerator = new AtomicLong(0);
36+
private final ReentrantLock lock = new ReentrantLock();
4037

4138
public void lock() {
39+
LOGGER.debug(
40+
"PipeTaskCoordinator lock waiting for thread {}", Thread.currentThread().getName());
4241
try {
43-
final long id = idGenerator.incrementAndGet();
42+
lock.lockInterruptibly();
4443
LOGGER.debug(
45-
"PipeTaskCoordinator lock (id: {}) waiting for thread {}",
46-
id,
47-
Thread.currentThread().getName());
48-
deque.put(id);
49-
LOGGER.debug(
50-
"PipeTaskCoordinator lock (id: {}) acquired by thread {}",
51-
id,
52-
Thread.currentThread().getName());
53-
} catch (InterruptedException e) {
44+
"PipeTaskCoordinator lock acquired by thread {}", Thread.currentThread().getName());
45+
} catch (final InterruptedException e) {
5446
Thread.currentThread().interrupt();
5547
LOGGER.error(
5648
"Interrupted while waiting for PipeTaskCoordinator lock, current thread: {}",
@@ -60,21 +52,15 @@ public void lock() {
6052

6153
public boolean tryLock() {
6254
try {
63-
final long id = idGenerator.incrementAndGet();
6455
LOGGER.debug(
65-
"PipeTaskCoordinator lock (id: {}) waiting for thread {}",
66-
id,
67-
Thread.currentThread().getName());
68-
if (deque.offer(id, 10, TimeUnit.SECONDS)) {
56+
"PipeTaskCoordinator lock waiting for thread {}", Thread.currentThread().getName());
57+
if (lock.tryLock(10, TimeUnit.SECONDS)) {
6958
LOGGER.debug(
70-
"PipeTaskCoordinator lock (id: {}) acquired by thread {}",
71-
id,
72-
Thread.currentThread().getName());
59+
"PipeTaskCoordinator lock acquired by thread {}", Thread.currentThread().getName());
7360
return true;
7461
} else {
7562
LOGGER.info(
76-
"PipeTaskCoordinator lock (id: {}) failed to acquire by thread {} because of timeout",
77-
id,
63+
"PipeTaskCoordinator lock failed to acquire by thread {} because of timeout",
7864
Thread.currentThread().getName());
7965
return false;
8066
}
@@ -88,20 +74,12 @@ public boolean tryLock() {
8874
}
8975

9076
public void unlock() {
91-
final Long id = deque.poll();
92-
if (id == null) {
93-
LOGGER.error(
94-
"PipeTaskCoordinator lock released by thread {} but the lock is not acquired by any thread",
95-
Thread.currentThread().getName());
96-
} else {
97-
LOGGER.debug(
98-
"PipeTaskCoordinator lock (id: {}) released by thread {}",
99-
id,
100-
Thread.currentThread().getName());
101-
}
77+
lock.unlock();
78+
LOGGER.debug(
79+
"PipeTaskCoordinator lock released by thread {}", Thread.currentThread().getName());
10280
}
10381

10482
public boolean isLocked() {
105-
return !deque.isEmpty();
83+
return lock.isLocked();
10684
}
10785
}

0 commit comments

Comments
 (0)