Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;
Expand Down Expand Up @@ -125,7 +125,10 @@ public abstract class AbstractDirectory<T> implements Directory<T> {

private volatile ScheduledFuture<?> connectivityCheckFuture;

private final ReentrantLock invokerRefreshLock = new ReentrantLock();
private final ReentrantReadWriteLock invokerRefreshLock = new ReentrantReadWriteLock();

private final ReentrantReadWriteLock.ReadLock invokerRefreshReadLock = invokerRefreshLock.readLock();
private final ReentrantReadWriteLock.WriteLock invokerRefreshWriteLock = invokerRefreshLock.writeLock();

/**
* The max count of invokers for each reconnect task select to try to reconnect.
Expand Down Expand Up @@ -208,27 +211,35 @@ public List<Invoker<T>> list(Invocation invocation) throws RpcException {
BitList<Invoker<T>> availableInvokers;
SingleRouterChain<T> singleChain = null;
try {
if (routerChain != null) {
routerChain.getLock().readLock().lock();
}
boolean lockAcquired = false;
try {
if (routerChain != null) {
routerChain.getLock().readLock().lock();
if (!invokerRefreshReadLock.tryLock(LockUtils.DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS)) {
throw new RpcException("Failed to acquire read lock on invokerRefreshLock within timeout");
}
lockAcquired = true;
// use clone to avoid being modified at doList().
if (invokersInitialized) {
availableInvokers = validInvokers.clone();
} else {
availableInvokers = invokers.clone();
}

if (routerChain != null) {
singleChain = routerChain.getSingleChain(getConsumerUrl(), availableInvokers, invocation);
singleChain.getLock().readLock().lock();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RpcException(
"Interrupted while acquiring read lock for invoker access, cause: " + e.getMessage(), e);
} finally {
if (routerChain != null) {
routerChain.getLock().readLock().unlock();
if (lockAcquired) {
invokerRefreshReadLock.unlock();
}
}

if (routerChain != null) {
singleChain = routerChain.getSingleChain(getConsumerUrl(), availableInvokers, invocation);
singleChain.getLock().readLock().lock();
}
List<Invoker<T>> routedResult = doList(singleChain, availableInvokers, invocation);
if (routedResult.isEmpty()) {
// 2-2 - No provider available.
Expand All @@ -249,6 +260,9 @@ public List<Invoker<T>> list(Invocation invocation) throws RpcException {
if (singleChain != null) {
singleChain.getLock().readLock().unlock();
}
if (routerChain != null) {
routerChain.getLock().readLock().unlock();
}
}
}

Expand Down Expand Up @@ -298,7 +312,7 @@ public void discordAddresses() {

@Override
public void addInvalidateInvoker(Invoker<T> invoker) {
LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> {
LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> {
// 1. remove this invoker from validInvokers list, this invoker will not be listed in the next time
if (removeValidInvoker(invoker)) {
// 2. add this invoker to reconnect list
Expand Down Expand Up @@ -329,7 +343,7 @@ public void checkConnectivity() {
// 1. pick invokers from invokersToReconnect
// limit max reconnectTaskTryCount, prevent this task hang up all the connectivityExecutor
// for long time
LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> {
LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> {
if (invokersToReconnect.size() < reconnectTaskTryCount) {
invokersToTry.addAll(invokersToReconnect);
} else {
Expand All @@ -348,7 +362,7 @@ public void checkConnectivity() {
// 2. try to check the invoker's status
for (Invoker<T> invoker : invokersToTry) {
AtomicBoolean invokerExist = new AtomicBoolean(false);
LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> {
LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> {
invokerExist.set(invokers.contains(invoker));
});
// Should not lock here, `invoker.isAvailable` may need some time to check
Expand All @@ -362,7 +376,7 @@ public void checkConnectivity() {
}

// 3. recover valid invoker
LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> {
LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> {
for (Invoker<T> tInvoker : needDeleteList) {
if (invokers.contains(tInvoker)) {
addValidInvoker(tInvoker);
Expand All @@ -388,7 +402,7 @@ public void checkConnectivity() {
}

// 4. submit new task if it has more to recover
LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> {
LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> {
if (!invokersToReconnect.isEmpty()) {
checkConnectivity();
}
Expand All @@ -411,7 +425,7 @@ public void checkConnectivity() {
* 4. all the invokers disappeared from total invokers should be removed in the disabled invokers list
*/
public void refreshInvoker() {
LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> {
LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> {
if (invokersInitialized) {
refreshInvokerInternal();
}
Expand Down Expand Up @@ -445,7 +459,7 @@ private void refreshInvokers(BitList<Invoker<T>> targetInvokers, Collection<Invo

@Override
public void addDisabledInvoker(Invoker<T> invoker) {
LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> {
LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> {
if (invokers.contains(invoker)) {
disabledInvokers.add(invoker);
removeValidInvoker(invoker);
Expand All @@ -458,7 +472,7 @@ public void addDisabledInvoker(Invoker<T> invoker) {

@Override
public void recoverDisabledInvoker(Invoker<T> invoker) {
LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> {
LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> {
if (disabledInvokers.remove(invoker)) {
try {
addValidInvoker(invoker);
Expand Down Expand Up @@ -526,7 +540,7 @@ public Set<Invoker<T>> getDisabledInvokers() {
}

protected void setInvokers(BitList<Invoker<T>> invokers) {
LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> {
LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> {
this.invokers = invokers;
refreshInvokerInternal();
this.invokersInitialized = true;
Expand All @@ -538,7 +552,7 @@ protected void setInvokers(BitList<Invoker<T>> invokers) {

protected void destroyInvokers() {
// set empty instead of clearing to support concurrent access.
LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> {
LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> {
this.invokers = BitList.emptyList();
this.validInvokers = BitList.emptyList();
this.invokersInitialized = false;
Expand All @@ -547,7 +561,7 @@ protected void destroyInvokers() {

private boolean addValidInvoker(Invoker<T> invoker) {
AtomicBoolean result = new AtomicBoolean(false);
LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> {
LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> {
result.set(this.validInvokers.add(invoker));
});
MetricsEventBus.publish(
Expand All @@ -557,7 +571,7 @@ private boolean addValidInvoker(Invoker<T> invoker) {

private boolean removeValidInvoker(Invoker<T> invoker) {
AtomicBoolean result = new AtomicBoolean(false);
LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> {
LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> {
result.set(this.validInvokers.remove(invoker));
});
MetricsEventBus.publish(
Expand Down
Loading