Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public class PipeTaskCoordinator {
private final PipeTaskInfo pipeTaskInfo;

private final PipeTaskCoordinatorLock pipeTaskCoordinatorLock;
private AtomicReference<PipeTaskInfo> pipeTaskInfoHolder;

public PipeTaskCoordinator(ConfigManager configManager, PipeTaskInfo pipeTaskInfo) {
this.configManager = configManager;
Expand All @@ -69,12 +68,7 @@ public PipeTaskCoordinator(ConfigManager configManager, PipeTaskInfo pipeTaskInf
* null if the lock is not acquired.
*/
public AtomicReference<PipeTaskInfo> tryLock() {
if (pipeTaskCoordinatorLock.tryLock()) {
pipeTaskInfoHolder = new AtomicReference<>(pipeTaskInfo);
return pipeTaskInfoHolder;
}

return null;
return pipeTaskCoordinatorLock.tryLock() ? new AtomicReference<>(pipeTaskInfo) : null;
}

/**
Expand All @@ -85,8 +79,7 @@ public AtomicReference<PipeTaskInfo> tryLock() {
*/
public AtomicReference<PipeTaskInfo> lock() {
pipeTaskCoordinatorLock.lock();
pipeTaskInfoHolder = new AtomicReference<>(pipeTaskInfo);
return pipeTaskInfoHolder;
return new AtomicReference<>(pipeTaskInfo);
}

/**
Expand All @@ -97,11 +90,6 @@ public AtomicReference<PipeTaskInfo> lock() {
* the lock.
*/
public boolean unlock() {
if (pipeTaskInfoHolder != null) {
pipeTaskInfoHolder.set(null);
pipeTaskInfoHolder = null;
}

try {
pipeTaskCoordinatorLock.unlock();
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

/**
* {@link PipeTaskCoordinatorLock} is a cross thread lock for pipe task coordinator. It is used to
Expand All @@ -35,22 +33,16 @@ public class PipeTaskCoordinatorLock {

private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskCoordinatorLock.class);

private final BlockingDeque<Long> deque = new LinkedBlockingDeque<>(1);
private final AtomicLong idGenerator = new AtomicLong(0);
private final ReentrantLock lock = new ReentrantLock();

public void lock() {
LOGGER.debug(
"PipeTaskCoordinator lock waiting for thread {}", Thread.currentThread().getName());
try {
final long id = idGenerator.incrementAndGet();
lock.lockInterruptibly();
LOGGER.debug(
"PipeTaskCoordinator lock (id: {}) waiting for thread {}",
id,
Thread.currentThread().getName());
deque.put(id);
LOGGER.debug(
"PipeTaskCoordinator lock (id: {}) acquired by thread {}",
id,
Thread.currentThread().getName());
} catch (InterruptedException e) {
"PipeTaskCoordinator lock acquired by thread {}", Thread.currentThread().getName());
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.error(
"Interrupted while waiting for PipeTaskCoordinator lock, current thread: {}",
Expand All @@ -60,21 +52,15 @@ public void lock() {

public boolean tryLock() {
try {
final long id = idGenerator.incrementAndGet();
LOGGER.debug(
"PipeTaskCoordinator lock (id: {}) waiting for thread {}",
id,
Thread.currentThread().getName());
if (deque.offer(id, 10, TimeUnit.SECONDS)) {
"PipeTaskCoordinator lock waiting for thread {}", Thread.currentThread().getName());
if (lock.tryLock(10, TimeUnit.SECONDS)) {
LOGGER.debug(
"PipeTaskCoordinator lock (id: {}) acquired by thread {}",
id,
Thread.currentThread().getName());
"PipeTaskCoordinator lock acquired by thread {}", Thread.currentThread().getName());
return true;
} else {
LOGGER.info(
"PipeTaskCoordinator lock (id: {}) failed to acquire by thread {} because of timeout",
id,
"PipeTaskCoordinator lock failed to acquire by thread {} because of timeout",
Thread.currentThread().getName());
return false;
}
Expand All @@ -88,20 +74,12 @@ public boolean tryLock() {
}

public void unlock() {
final Long id = deque.poll();
if (id == null) {
LOGGER.error(
"PipeTaskCoordinator lock released by thread {} but the lock is not acquired by any thread",
Thread.currentThread().getName());
} else {
LOGGER.debug(
"PipeTaskCoordinator lock (id: {}) released by thread {}",
id,
Thread.currentThread().getName());
}
lock.unlock();
LOGGER.debug(
"PipeTaskCoordinator lock released by thread {}", Thread.currentThread().getName());
}

public boolean isLocked() {
return !deque.isEmpty();
return lock.isLocked();
}
}
Loading