Skip to content

Commit 06f9aff

Browse files
committed
feat:support stat and event report with service discovery.
1 parent 5a9753c commit 06f9aff

File tree

13 files changed

+556
-73
lines changed

13 files changed

+556
-73
lines changed
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Tencent is pleased to support the open source community by making polaris-java available.
3+
*
4+
* Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
5+
*
6+
* Licensed under the BSD 3-Clause License (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* https://opensource.org/licenses/BSD-3-Clause
11+
*
12+
* Unless required by applicable law or agreed to in writing, software distributed
13+
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14+
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
15+
* specific language governing permissions and limitations under the License.
16+
*/
17+
18+
package com.tencent.polaris.client.remote;
19+
20+
import com.tencent.polaris.annonation.JustForTest;
21+
import com.tencent.polaris.api.config.consumer.ServiceRouterConfig;
22+
import com.tencent.polaris.api.config.verify.DefaultValues;
23+
import com.tencent.polaris.api.exception.PolarisException;
24+
import com.tencent.polaris.api.plugin.compose.Extensions;
25+
import com.tencent.polaris.api.pojo.Instance;
26+
import com.tencent.polaris.api.pojo.ServiceKey;
27+
import com.tencent.polaris.api.utils.CollectionUtils;
28+
import com.tencent.polaris.api.utils.StringUtils;
29+
import com.tencent.polaris.client.flow.BaseFlow;
30+
import com.tencent.polaris.client.pojo.Node;
31+
import com.tencent.polaris.client.util.CommonValidator;
32+
import com.tencent.polaris.logging.LoggerFactory;
33+
import org.slf4j.Logger;
34+
35+
import java.util.ArrayList;
36+
import java.util.List;
37+
38+
/**
39+
* Repository for service addresses.
40+
*
41+
* @author Haotian Zhang
42+
*/
43+
public class ServiceAddressRepository {
44+
45+
private static final Logger LOG = LoggerFactory.getLogger(ServiceAddressRepository.class);
46+
47+
private final List<Node> nodes;
48+
49+
private int curIndex;
50+
51+
private final String clientId;
52+
53+
private final Extensions extensions;
54+
55+
private final ServiceKey remoteCluster;
56+
57+
private final List<String> routers;
58+
59+
private final String lbPolicy;
60+
61+
private final String protocol;
62+
63+
public ServiceAddressRepository(List<String> addresses, String clientId, Extensions extensions,
64+
ServiceKey remoteCluster) {
65+
this(addresses, clientId, extensions, remoteCluster, null, null, null);
66+
this.routers.add(ServiceRouterConfig.DEFAULT_ROUTER_METADATA);
67+
this.routers.add(ServiceRouterConfig.DEFAULT_ROUTER_NEARBY);
68+
}
69+
70+
public ServiceAddressRepository(List<String> addresses, String clientId, Extensions extensions,
71+
ServiceKey remoteCluster, List<String> routers, String lbPolicy, String protocol) {
72+
// to ip addresses.
73+
this.nodes = new ArrayList<>();
74+
if (CollectionUtils.isNotEmpty(addresses)) {
75+
for (String address : addresses) {
76+
if (StringUtils.isNotBlank(address)) {
77+
int colonIdx = address.lastIndexOf(":");
78+
if (colonIdx > 0 && colonIdx < address.length() - 1) {
79+
String host = address.substring(0, colonIdx);
80+
try {
81+
int port = Integer.parseInt(address.substring(colonIdx + 1));
82+
nodes.add(new Node(host, port));
83+
} catch (NumberFormatException e) {
84+
LOG.warn("Invalid port number in address: {}", address);
85+
}
86+
} else {
87+
LOG.warn("Invalid address format, expected 'host:port': {}", address);
88+
}
89+
}
90+
}
91+
}
92+
this.curIndex = 0;
93+
94+
// from discovery.
95+
this.clientId = clientId;
96+
this.extensions = extensions;
97+
CommonValidator.validateNamespaceService(remoteCluster.getNamespace(), remoteCluster.getService());
98+
this.remoteCluster = remoteCluster;
99+
if (CollectionUtils.isEmpty(routers)) {
100+
this.routers = new ArrayList<>();
101+
} else {
102+
this.routers = routers;
103+
}
104+
if (StringUtils.isBlank(lbPolicy)) {
105+
this.lbPolicy = DefaultValues.DEFAULT_LOADBALANCER;
106+
} else {
107+
this.lbPolicy = lbPolicy;
108+
}
109+
if (StringUtils.isBlank(protocol)) {
110+
this.protocol = "http";
111+
} else {
112+
this.protocol = protocol;
113+
}
114+
}
115+
116+
public String getServiceAddress() throws PolarisException {
117+
Node node = getServiceAddressNode();
118+
return node.getHostPort();
119+
}
120+
121+
public Node getServiceAddressNode() throws PolarisException {
122+
if (CollectionUtils.isNotEmpty(nodes)) {
123+
Node node = nodes.get(Math.abs(curIndex % nodes.size()));
124+
curIndex = (curIndex + 1) % Integer.MAX_VALUE;
125+
if (LOG.isDebugEnabled()) {
126+
LOG.debug("success to get instance, instance is {}:{}", node.getHost(), node.getPort());
127+
}
128+
return node;
129+
}
130+
Instance instance = getDiscoverInstance();
131+
if (LOG.isDebugEnabled()) {
132+
LOG.debug("success to get instance for service {}, instance is {}:{}", remoteCluster, instance.getHost(), instance.getPort());
133+
}
134+
return new Node(instance.getHost(), instance.getPort());
135+
}
136+
137+
private Instance getDiscoverInstance() throws PolarisException {
138+
Instance instance = BaseFlow.commonGetOneInstance(extensions, remoteCluster, routers, lbPolicy, protocol, clientId);
139+
LOG.info("success to get instance for service {}, instance is {}:{}", remoteCluster, instance.getHost(), instance.getPort());
140+
return instance;
141+
}
142+
143+
@JustForTest
144+
List<Node> getNodes() {
145+
return nodes;
146+
}
147+
148+
@JustForTest
149+
List<String> getRouters() {
150+
return routers;
151+
}
152+
153+
@JustForTest
154+
String getLbPolicy() {
155+
return lbPolicy;
156+
}
157+
158+
@JustForTest
159+
String getProtocol() {
160+
return protocol;
161+
}
162+
}
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
package com.tencent.polaris.client.remote;
2+
3+
import com.tencent.polaris.api.config.consumer.ServiceRouterConfig;
4+
import com.tencent.polaris.api.exception.ErrorCode;
5+
import com.tencent.polaris.api.exception.PolarisException;
6+
import com.tencent.polaris.api.plugin.compose.Extensions;
7+
import com.tencent.polaris.api.pojo.Instance;
8+
import com.tencent.polaris.api.pojo.ServiceKey;
9+
import com.tencent.polaris.client.flow.BaseFlow;
10+
import com.tencent.polaris.client.pojo.Node;
11+
import org.junit.AfterClass;
12+
import org.junit.Before;
13+
import org.junit.BeforeClass;
14+
import org.junit.Test;
15+
import org.junit.runner.RunWith;
16+
import org.mockito.Mock;
17+
import org.mockito.MockedStatic;
18+
import org.mockito.Mockito;
19+
import org.mockito.junit.MockitoJUnitRunner;
20+
21+
import java.util.Arrays;
22+
import java.util.Collections;
23+
import java.util.List;
24+
25+
import static org.junit.Assert.*;
26+
import static org.mockito.ArgumentMatchers.*;
27+
import static org.mockito.Mockito.when;
28+
29+
/**
30+
* Test for {@link ServiceAddressRepository}.
31+
*
32+
* @author Haotian Zhang
33+
*/
34+
@RunWith(MockitoJUnitRunner.class)
35+
public class ServiceAddressRepositoryTest {
36+
37+
private static MockedStatic<BaseFlow> mockedBaseFlow;
38+
39+
@Mock
40+
private Extensions extensions;
41+
42+
@Mock
43+
private static Instance mockInstance;
44+
45+
private final ServiceKey remoteCluster = new ServiceKey("test-namespace", "test-service");
46+
private final String clientId = "test-client";
47+
48+
@BeforeClass
49+
public static void beforeClass() {
50+
mockedBaseFlow = Mockito.mockStatic(BaseFlow.class);
51+
}
52+
53+
@Before
54+
public void setUp() {
55+
when(mockInstance.getHost()).thenReturn("1.2.3.4");
56+
when(mockInstance.getPort()).thenReturn(8080);
57+
}
58+
59+
@AfterClass
60+
public static void AfterClass() {
61+
if (mockedBaseFlow != null) {
62+
mockedBaseFlow.close();
63+
}
64+
}
65+
66+
@Test
67+
public void testConstructorWithDefaultParams() {
68+
List<String> addresses = Arrays.asList("host1:8080", "host2:9090");
69+
ServiceAddressRepository repository = new ServiceAddressRepository(
70+
addresses, clientId, extensions, remoteCluster);
71+
72+
assertNotNull(repository);
73+
assertEquals(2, repository.getNodes().size());
74+
assertEquals(ServiceRouterConfig.DEFAULT_ROUTER_METADATA, repository.getRouters().get(0));
75+
assertEquals(ServiceRouterConfig.DEFAULT_ROUTER_NEARBY, repository.getRouters().get(1));
76+
assertEquals("http", repository.getProtocol());
77+
}
78+
79+
@Test
80+
public void testConstructorWithCustomParams() {
81+
List<String> addresses = Arrays.asList("host1:8080", "host2:9090");
82+
List<String> routers = Arrays.asList("custom-router1", "custom-router2");
83+
String lbPolicy = "custom-lb";
84+
String protocol = "grpc";
85+
86+
ServiceAddressRepository repository = new ServiceAddressRepository(
87+
addresses, clientId, extensions, remoteCluster, routers, lbPolicy, protocol);
88+
89+
assertNotNull(repository);
90+
assertEquals(2, repository.getNodes().size());
91+
assertEquals("custom-router1", repository.getRouters().get(0));
92+
assertEquals("custom-router2", repository.getRouters().get(1));
93+
assertEquals("custom-lb", repository.getLbPolicy());
94+
assertEquals("grpc", repository.getProtocol());
95+
}
96+
97+
@Test
98+
public void testConstructorWithEmptyAddresses() {
99+
ServiceAddressRepository repository = new ServiceAddressRepository(
100+
null, clientId, extensions, remoteCluster);
101+
102+
assertNotNull(repository);
103+
assertTrue(repository.getNodes().isEmpty());
104+
}
105+
106+
@Test
107+
public void testConstructorWithInvalidAddresses() {
108+
List<String> addresses = Arrays.asList("host1", "host2:", ":8080", "host3:invalid", "");
109+
ServiceAddressRepository repository = new ServiceAddressRepository(
110+
addresses, clientId, extensions, remoteCluster);
111+
112+
assertNotNull(repository);
113+
assertTrue(repository.getNodes().isEmpty());
114+
}
115+
116+
@Test
117+
public void testGetServiceAddressNodeWithLocalNodes() throws PolarisException {
118+
List<String> addresses = Arrays.asList("host1:8080", "host2:9090", "host3:7070");
119+
ServiceAddressRepository repository = new ServiceAddressRepository(
120+
addresses, clientId, extensions, remoteCluster);
121+
122+
// First call
123+
Node node1 = repository.getServiceAddressNode();
124+
assertEquals("host1", node1.getHost());
125+
assertEquals(8080, node1.getPort());
126+
127+
// Second call - should round robin
128+
Node node2 = repository.getServiceAddressNode();
129+
assertEquals("host2", node2.getHost());
130+
assertEquals(9090, node2.getPort());
131+
132+
// Third call
133+
Node node3 = repository.getServiceAddressNode();
134+
assertEquals("host3", node3.getHost());
135+
assertEquals(7070, node3.getPort());
136+
137+
// Fourth call - should wrap around
138+
Node node4 = repository.getServiceAddressNode();
139+
assertEquals("host1", node4.getHost());
140+
assertEquals(8080, node4.getPort());
141+
}
142+
143+
@Test
144+
public void testGetServiceAddressNodeWithEmptyNodes() throws PolarisException {
145+
ServiceAddressRepository repository = new ServiceAddressRepository(
146+
Collections.emptyList(), clientId, extensions, remoteCluster);
147+
148+
mockedBaseFlow.when(() -> BaseFlow.commonGetOneInstance(
149+
any(), any(), anyList(), anyString(), anyString(), anyString()))
150+
.thenReturn(mockInstance);
151+
152+
Node node = repository.getServiceAddressNode();
153+
assertEquals("1.2.3.4", node.getHost());
154+
assertEquals(8080, node.getPort());
155+
}
156+
157+
@Test(expected = PolarisException.class)
158+
public void testGetServiceAddressNodeWithDiscoveryFailure() throws PolarisException {
159+
ServiceAddressRepository repository = new ServiceAddressRepository(
160+
Collections.emptyList(), clientId, extensions, remoteCluster);
161+
162+
mockedBaseFlow.when(() -> BaseFlow.commonGetOneInstance(
163+
any(), any(), anyList(), anyString(), anyString(), anyString()))
164+
.thenThrow(new PolarisException(ErrorCode.INSTANCE_NOT_FOUND, "Discovery failed"));
165+
166+
repository.getServiceAddressNode();
167+
}
168+
169+
@Test
170+
public void testGetServiceAddress() throws PolarisException {
171+
List<String> addresses = Arrays.asList("host1:8080", "host2:9090");
172+
ServiceAddressRepository repository = new ServiceAddressRepository(
173+
addresses, clientId, extensions, remoteCluster);
174+
175+
String address1 = repository.getServiceAddress();
176+
assertTrue(address1.equals("host1:8080") || address1.equals("host2:9090"));
177+
178+
String address2 = repository.getServiceAddress();
179+
assertNotEquals(address1, address2); // Should be different due to round robin
180+
}
181+
182+
@Test
183+
public void testGetServiceAddressWithDiscovery() throws PolarisException {
184+
ServiceAddressRepository repository = new ServiceAddressRepository(
185+
Collections.emptyList(), clientId, extensions, remoteCluster);
186+
187+
mockedBaseFlow.when(() -> BaseFlow.commonGetOneInstance(
188+
any(), any(), anyList(), anyString(), anyString(), anyString()))
189+
.thenReturn(mockInstance);
190+
191+
String address = repository.getServiceAddress();
192+
assertEquals("1.2.3.4:8080", address);
193+
}
194+
}

polaris-common/polaris-config-default/src/main/resources/conf/default-config.yml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,13 @@ global:
9797
# 描述:PushGateway 事件上报开关
9898
enable: false
9999
# 描述:PushGateway 事件上报队列长度
100-
eventQueueSize: 10000;
100+
eventQueueSize: 1000;
101101
# 描述:PushGateway 事件上报最大批量大小
102102
maxBatchSize: 100
103+
# 描述:事件服务的命名空间
104+
namespace: Polaris
105+
# 描述:事件服务的服务名
106+
service: polaris.pushgateway
103107
# 描述:Admin相关的配置
104108
admin:
105109
# 描述:Admin的监听的IP
@@ -131,6 +135,10 @@ global:
131135
# #范围:[1s:...]
132136
# #默认值:10s
133137
# pushInterval: 10s
138+
# 描述:监控服务的命名空间
139+
namespace: Polaris
140+
# 描述:监控服务的服务名
141+
service: polaris.pushgateway
134142
location:
135143
providers:
136144
- type: local

0 commit comments

Comments
 (0)