Skip to content

Commit cd838a5

Browse files
author
fishtailfu
committed
feat: support dual-discovery
1 parent 9766ef7 commit cd838a5

File tree

2 files changed

+25
-14
lines changed

2 files changed

+25
-14
lines changed

polaris-plugins/polaris-plugins-connector/connector-nacos/src/main/java/com/tencent/polaris/plugins/connector/nacos/NacosConnector.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -264,12 +264,7 @@ public CommonProviderResponse registerInstance(CommonProviderRequest req,
264264

265265
try {
266266
Instance instance = buildRegisterNacosInstance(req);
267-
// 优先设置成nacos的service name,如没有再设置成req的service name
268-
String serviceName = req.getService();
269-
if (StringUtils.isNotEmpty(nacosContext.getServiceName())) {
270-
serviceName = nacosContext.getServiceName();
271-
}
272-
namingService.registerInstance(serviceName,
267+
namingService.registerInstance(instance.getServiceName(),
273268
nacosContext.getGroupName(), instance);
274269
response.setInstanceID(instance.getInstanceId());
275270
} catch (NacosException e) {
@@ -376,7 +371,12 @@ public void addLongRunningTask(ServiceUpdateTask serviceUpdateTask) {
376371
@Override
377372
protected void doDestroy() {
378373
if (initialized.compareAndSet(true, false)) {
379-
// shutdown naming service
374+
// 先unsubscribe listener
375+
if (CollectionUtils.isNotEmpty(nacosServices)) {
376+
nacosServices.forEach((s, nacosService) -> {
377+
nacosService.destroy();
378+
});
379+
}
380380
if (CollectionUtils.isNotEmpty(namingServices)) {
381381
namingServices.forEach((s, namingService) -> {
382382
try {
@@ -385,11 +385,7 @@ protected void doDestroy() {
385385
}
386386
});
387387
}
388-
if (CollectionUtils.isNotEmpty(nacosServices)) {
389-
nacosServices.forEach((s, nacosService) -> {
390-
nacosService.destroy();
391-
});
392-
}
388+
393389
}
394390
}
395391

polaris-plugins/polaris-plugins-connector/connector-nacos/src/main/java/com/tencent/polaris/plugins/connector/nacos/NacosService.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import java.util.List;
5555
import java.util.Map;
5656
import java.util.Set;
57+
import java.util.concurrent.ConcurrentHashMap;
5758
import java.util.concurrent.ExecutorService;
5859
import java.util.concurrent.Executors;
5960
import org.slf4j.Logger;
@@ -70,11 +71,14 @@ public class NacosService extends Destroyable {
7071

7172
private static final int NACOS_SERVICE_PAGESIZE = 10;
7273

74+
private Map<String, EventListener> eventListeners;
75+
7376
public NacosService(NamingService namingService, NacosContext nacosContext) {
7477
this.namingService = namingService;
7578
this.nacosContext = nacosContext;
7679
NamedThreadFactory threadFactory = new NamedThreadFactory("nacos-service");
7780
this.refreshExecutor = Executors.newFixedThreadPool(8, threadFactory);
81+
this.eventListeners = new ConcurrentHashMap<>();
7882
}
7983

8084

@@ -144,7 +148,8 @@ public void asyncGetInstances(ServiceUpdateTask serviceUpdateTask) {
144148
newDiscoverResponseBuilder.addAllInstances(polarisInstanceList);
145149
int code = ServerCodes.EXECUTE_SUCCESS;
146150
newDiscoverResponseBuilder.setCode(UInt32Value.of(code));
147-
LOG.debug("[NacosConnector] Subscribe instances of {} success. ", serviceUpdateTask.getServiceEventKey().getService());
151+
LOG.debug("[NacosConnector] Subscribe instances of {} success. ",
152+
serviceUpdateTask.getServiceEventKey().getService());
148153
ServerEvent serverEvent = new ServerEvent(serviceUpdateTask.getServiceEventKey(),
149154
newDiscoverResponseBuilder.build(), null, SERVER_CONNECTOR_NACOS);
150155
boolean svcDeleted = serviceUpdateTask.notifyServerEvent(serverEvent);
@@ -173,6 +178,8 @@ public void asyncGetInstances(ServiceUpdateTask serviceUpdateTask) {
173178
try {
174179
namingService.subscribe(serviceUpdateTask.getServiceEventKey().getService(), nacosContext.getGroupName(),
175180
serviceListener);
181+
eventListeners.put(serviceUpdateTask.getServiceEventKey().getService(), serviceListener);
182+
176183
} catch (NacosException e) {
177184
LOG.error("Get nacos service instances of {} failed. ", serviceUpdateTask.getServiceEventKey().getService(),
178185
e);
@@ -241,7 +248,8 @@ private void syncGetService(ServiceUpdateTask serviceUpdateTask) {
241248

242249
int code = ServerCodes.EXECUTE_SUCCESS;
243250
newDiscoverResponseBuilder.setCode(UInt32Value.of(code));
244-
LOG.debug("[NacosConnector] get service of {} success. ", serviceUpdateTask.getServiceEventKey().getService());
251+
LOG.debug("[NacosConnector] get service of {} success. ",
252+
serviceUpdateTask.getServiceEventKey().getService());
245253
ServerEvent serverEvent = new ServerEvent(serviceUpdateTask.getServiceEventKey(),
246254
newDiscoverResponseBuilder.build(), null, SERVER_CONNECTOR_NACOS);
247255
boolean svcDeleted = serviceUpdateTask.notifyServerEvent(serverEvent);
@@ -269,6 +277,13 @@ private String buildRevision(List<Instance> instances) throws Exception {
269277

270278
@Override
271279
protected void doDestroy() {
280+
for (Map.Entry<String, EventListener> entry : eventListeners.entrySet()) {
281+
try {
282+
namingService.unsubscribe(entry.getKey(), nacosContext.getGroupName(), entry.getValue());
283+
} catch (NacosException e) {
284+
LOG.error("[NacosConnector] unsubscribe service {} in group {} failed. ", entry.getKey(), nacosContext.getGroupName());
285+
}
286+
}
272287
ThreadPoolUtils.waitAndStopThreadPools(new ExecutorService[]{refreshExecutor});
273288
}
274289

0 commit comments

Comments
 (0)