Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.Registry;
import org.apache.dubbo.registry.RegistryNotifier;
import org.apache.dubbo.registry.nacos.util.NacosNamingServiceUtils;
import org.apache.dubbo.registry.support.FailbackRegistry;
import org.apache.dubbo.registry.support.SkipFailbackWrapperException;
import org.apache.dubbo.rpc.RpcException;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
Expand Down Expand Up @@ -281,8 +283,7 @@ private void doSubscribe(final URL url, final NacosAggregateListener listener, f
}
} else {
for (String serviceName : serviceNames) {
List<Instance> instances = new LinkedList<>();
instances.addAll(namingService.getAllInstancesWithoutSubscription(
List<Instance> instances = new LinkedList<>(namingService.getAllInstancesWithoutSubscription(
serviceName, getUrl().getGroup(Constants.DEFAULT_GROUP)));
String serviceInterface = serviceName;
String[] segments = serviceName.split(SERVICE_NAME_SEPARATOR, -1);
Expand Down Expand Up @@ -311,10 +312,8 @@ private void doSubscribe(final URL url, final NacosAggregateListener listener, f

/**
* Since 2.7.6 the legacy service name will be added to serviceNames to fix bug with
* https://github.com/apache/dubbo/issues/5442
* <a href="https://github.com/apache/dubbo/issues/5442">...</a>
*
* @param url
* @return
*/
private boolean isServiceNamesWithCompatibleMode(final URL url) {
return !isAdminProtocol(url) && createServiceName(url).isConcrete();
Expand Down Expand Up @@ -412,18 +411,15 @@ private Set<String> getServiceNames0(URL url) {

private Set<String> filterServiceNames(NacosServiceName serviceName) {
try {
Set<String> serviceNames = new LinkedHashSet<>();
serviceNames.addAll(
namingService
.getServicesOfServer(1, Integer.MAX_VALUE, getUrl().getGroup(Constants.DEFAULT_GROUP))
.getData()
.stream()
.filter(this::isConformRules)
.map(NacosServiceName::new)
.filter(serviceName::isCompatible)
.map(NacosServiceName::toString)
.collect(Collectors.toList()));
return serviceNames;
return namingService
.getServicesOfServer(1, Integer.MAX_VALUE, getUrl().getGroup(Constants.DEFAULT_GROUP))
.getData()
.stream()
.filter(this::isConformRules)
.map(NacosServiceName::new)
.filter(serviceName::isCompatible)
.map(NacosServiceName::toString)
.collect(Collectors.toCollection(LinkedHashSet::new));
} catch (SkipFailbackWrapperException exception) {
throw exception;
} catch (Throwable cause) {
Expand All @@ -437,8 +433,6 @@ private Set<String> filterServiceNames(NacosServiceName serviceName) {
/**
* Verify whether it is a dubbo service
*
* @param serviceName
* @return
* @since 2.7.12
*/
private boolean isConformRules(String serviceName) {
Expand Down Expand Up @@ -510,14 +504,13 @@ private Set<String> getServiceNamesForOps(URL url) {

private Set<String> getAllServiceNames() {
try {
final Set<String> serviceNames = new LinkedHashSet<>();
int pageIndex = 1;
ListView<String> listView = namingService.getServicesOfServer(
pageIndex, PAGINATION_SIZE, getUrl().getGroup(Constants.DEFAULT_GROUP));
// First page data
List<String> firstPageData = listView.getData();
// Append first page into list
serviceNames.addAll(firstPageData);
final Set<String> serviceNames = new LinkedHashSet<>(firstPageData);
// the total count
int count = listView.getCount();
// the number of pages
Expand Down Expand Up @@ -608,18 +601,20 @@ private List<String> doGetServiceNames(URL url) {
public void destroy() {
super.destroy();
try {
this.namingService.shutdown();
} catch (NacosException e) {
logger.warn(REGISTRY_NACOS_EXCEPTION, "", "", "Unable to shutdown nacos naming service", e);
// Release the reference to the shared Nacos connection.
NacosNamingServiceUtils.releaseNamingService(getUrl());
} catch (Exception e) {
logger.warn(REGISTRY_NACOS_EXCEPTION, "", "", "Unable to release nacos naming service", e);
}
this.nacosListeners.clear();
this.originToAggregateListener.clear();
}

private List<URL> toUrlWithEmpty(URL consumerURL, Collection<Instance> instances) {
consumerURL = removeParamsFromConsumer(consumerURL);
List<URL> urls = buildURLs(consumerURL, instances);
// Nacos does not support configurators and routers from registry, so all notifications are of providers type.
if (urls.size() == 0 && !getUrl().getParameter(ENABLE_EMPTY_PROTECTION_KEY, DEFAULT_ENABLE_EMPTY_PROTECTION)) {
if (urls.isEmpty() && !getUrl().getParameter(ENABLE_EMPTY_PROTECTION_KEY, DEFAULT_ENABLE_EMPTY_PROTECTION)) {
logger.warn(
REGISTRY_NACOS_EXCEPTION,
"",
Expand Down Expand Up @@ -697,7 +692,7 @@ private void unsubscribeEventListener(String serviceName, final URL url, final N
private void notifySubscriber(
URL url, String serviceName, NacosAggregateListener listener, Collection<Instance> instances) {
List<Instance> enabledInstances = new LinkedList<>(instances);
if (enabledInstances.size() > 0) {
if (!enabledInstances.isEmpty()) {
// Instances
filterEnabledInstances(enabledInstances);
}
Expand All @@ -713,7 +708,9 @@ private void notifySubscriber(
* @return non-null array
*/
private List<String> getCategories(URL url) {
return ANY_VALUE.equals(url.getServiceInterface()) ? ALL_SUPPORTED_CATEGORIES : Arrays.asList(DEFAULT_CATEGORY);
return ANY_VALUE.equals(url.getServiceInterface())
? ALL_SUPPORTED_CATEGORIES
: Collections.singletonList(DEFAULT_CATEGORY);
}

private URL buildURL(URL consumerURL, Instance instance) {
Expand Down Expand Up @@ -770,7 +767,7 @@ private void filterEnabledInstances(Collection<Instance> instances) {
private interface NacosDataFilter<T> {

/**
* Tests whether or not the specified data should be accepted.
* Tests whether the specified data should be accepted.
*
* @param data The data to be tested
* @return <code>true</code> if and only if <code>data</code>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ protected void doUpdate(ServiceInstance oldServiceInstance, ServiceInstance newS
service.updateInstance(instance.getServiceName(), group, oldInstance, newInstance);
});
} catch (Exception e) {
throw new RpcException(REGISTRY_EXCEPTION, "Failed register instance " + newServiceInstance.toString(), e);
throw new RpcException(REGISTRY_EXCEPTION, "Failed register instance " + newServiceInstance, e);
}
}

Expand Down Expand Up @@ -232,11 +232,6 @@ public boolean isEmpty() {
}
}

@Override
public URL getUrl() {
return registryURL;
}

private void handleEvent(NamingEvent event, ServiceInstancesChangedListener listener) {
String serviceName = event.getServiceName();
List<ServiceInstance> serviceInstances = event.getInstances().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,19 @@
import org.apache.dubbo.registry.nacos.NacosNamingServiceWrapper;
import org.apache.dubbo.rpc.model.ScopeModelUtil;

import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.utils.NamingUtils;

import static com.alibaba.nacos.api.common.Constants.DEFAULT_GROUP;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_NACOS_EXCEPTION;

/**
* The utilities class for {@link NamingService}
Expand All @@ -50,6 +57,21 @@ public class NacosNamingServiceUtils {

private static final String NACOS_CHECK_KEY = "nacos.check";

// Keeps track of the shared wrapper instance and how many registries are using it.
private static final class NacosNamingServiceHolder {
final NacosNamingServiceWrapper wrapper;

// Atomic counter to track usage.
final AtomicInteger refCount;

NacosNamingServiceHolder(NacosNamingServiceWrapper wrapper) {
this.wrapper = wrapper;
this.refCount = new AtomicInteger(1);
}
}

private static final ConcurrentMap<String, NacosNamingServiceHolder> SERVICE_CACHE = new ConcurrentHashMap<>();

private NacosNamingServiceUtils() {
throw new IllegalStateException("NacosNamingServiceUtils should not be instantiated");
}
Expand Down Expand Up @@ -104,22 +126,158 @@ public static String getGroup(URL connectionURL) {
return connectionURL.getParameter(NACOS_GROUP_KEY, group);
}

// This ensures that different registry groups sharing the same Nacos server and namespace reuse a single
// physical connection.
private static String createNamingServiceCacheKey(URL connectionURL) {
URL normalized = normalizeConnectionURL(connectionURL);
return normalized.toFullString();
}

/**
* Create an instance of {@link NamingService} from specified {@link URL connection url}
* Create or obtain a shared {@link NacosNamingServiceWrapper} for the given connection URL.
*
* @param connectionURL {@link URL connection url}
* @return {@link NamingService}
* @param connectionURL the registry connection URL
* @return a shared {@link NacosNamingServiceWrapper}
* @since 2.7.5
*/
public static NacosNamingServiceWrapper createNamingService(URL connectionURL) {
String key = createNamingServiceCacheKey(connectionURL);

// Create or retrieve the shared service holder.
NacosNamingServiceHolder holder = SERVICE_CACHE.compute(key, (k, v) -> {
if (v == null) {
logger.info("Creating shared NacosNamingService for key: {}", key);
NacosNamingServiceWrapper newWrapper = createWrapperInternal(connectionURL);
return new NacosNamingServiceHolder(newWrapper);
}
v.refCount.incrementAndGet();
return v;
});

return holder.wrapper;
}

/**
* Release a previously acquired {@link NacosNamingServiceWrapper} reference.
*
* <p>This method decrements the reference count associated with the normalized
* Nacos connection key. When the reference count reaches zero, the underlying
* {@link NacosNamingServiceWrapper} is shut down and removed from the cache.</p>
*
* <p>If the reference count becomes negative, it indicates a lifecycle bug
* (i.e. {@code releaseNamingService} was called more times than
* {@code createNamingService}).</p>
*
* @param connectionURL the registry connection URL used to identify the shared naming service
*/
public static void releaseNamingService(URL connectionURL) {
String key = createNamingServiceCacheKey(connectionURL);

SERVICE_CACHE.compute(key, (k, v) -> {
if (v == null) {
return null;
}

int left = v.refCount.decrementAndGet();

// If the count hits zero, this is the last user so we close the physical connection.
if (left == 0) {
try {
logger.info("Destroying shared NacosNamingService for key: {}", key);
v.wrapper.shutdown();
} catch (Exception e) {
logger.warn(
REGISTRY_NACOS_EXCEPTION, "", "", "Failed to destroy naming service for key: " + key, e);
}
return null;
}

// Error case: more releases than creates (unbalanced lifecycle)
if (left < 0) {
logger.warn(
REGISTRY_NACOS_EXCEPTION,
"",
"",
"releaseNamingService called more times than createNamingService for key: "
+ key + " (refCount=" + left + "). "
+ "This indicates a bug in caller lifecycle management.");
try {
v.wrapper.shutdown();
} catch (Exception e) {
logger.warn(
REGISTRY_NACOS_EXCEPTION, "", "", "Failed to destroy naming service for key: " + key, e);
}
v.refCount.set(0);
return null;
}

return v;
});
}

/**
* Create a new {@link NacosNamingServiceWrapper} using the normalized
* connection URL and configured retry options.
*/
private static NacosNamingServiceWrapper createWrapperInternal(URL connectionURL) {

// Use of normalized URL for connection identity / cache key. This ensures
// registry.group differences don't create separate physical connections.
URL normalized = normalizeConnectionURL(connectionURL);

// We do NOT embed them into the cache key because they represent per-creation behavior,
// not part of the identity of the physical Nacos server/namespace.
boolean check = connectionURL.getParameter(NACOS_CHECK_KEY, true);
int retryTimes = connectionURL.getPositiveParameter(NACOS_RETRY_KEY, 10);
int sleepMsBetweenRetries = connectionURL.getPositiveParameter(NACOS_RETRY_WAIT_KEY, 10);

if (check && !UrlUtils.isCheck(connectionURL)) {
check = false;
}

NacosConnectionManager nacosConnectionManager =
new NacosConnectionManager(connectionURL, check, retryTimes, sleepMsBetweenRetries);
new NacosConnectionManager(normalized, check, retryTimes, sleepMsBetweenRetries);
return new NacosNamingServiceWrapper(nacosConnectionManager, retryTimes, sleepMsBetweenRetries);
}

/**
* Normalize a Nacos registry connection URL for connection reuse.
*
* <p>This method produces a standard form of the connection URL that is
* suitable for use as a cache key. It performs the following normalization steps:</p>
*
* <p>Removes group-related parameters, since grouping affects only service
* registration semantics and not the underlying physical Nacos connection.</p>
*
* <p>Standardizes the server address list by trimming, sorting, and rejoining
* addresses, ensuring that different address orders map to the same connection.</p>
*
* @param connectionURL the original registry connection URL
* @return a normalized URL representing the unique physical Nacos connection
*/
private static URL normalizeConnectionURL(URL connectionURL) {

// Start from the original URL to preserve all parameters
URL normalized = connectionURL.removeParameter(GROUP_KEY).removeParameter(NACOS_GROUP_KEY);

// Standardize server addresses for stable cache keys
String serverAddr = normalized.getParameter("serverAddr", normalized.getAddress());
if (serverAddr != null) {
String canonical = Arrays.stream(serverAddr.split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
.sorted()
.collect(Collectors.joining(","));
normalized = normalized.addParameter("serverAddr", canonical);
}
return normalized;
}

static void clearCacheForTest() {
SERVICE_CACHE.clear();
}

static int getCacheSizeForTest() {
return SERVICE_CACHE.size();
}
}
Loading
Loading