Skip to content

Commit c76b80d

Browse files
committed
feat:support composite connector check event supported.
1 parent a219354 commit c76b80d

File tree

6 files changed

+167
-118
lines changed

6 files changed

+167
-118
lines changed

polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/server/ServerConnector.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,17 @@ public interface ServerConnector extends Plugin {
4040
*/
4141
void registerServiceHandler(ServiceEventHandler handler) throws PolarisException;
4242

43+
/**
44+
* 检查服务事件是否支持
45+
*
46+
* @param eventType 服务事件类型
47+
* @return
48+
* @throws PolarisException
49+
*/
50+
default boolean checkEventSupported(ServiceEventKey.EventType eventType) throws PolarisException {
51+
return true;
52+
}
53+
4354
/**
4455
* 反注册服务监听器
4556
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Tencent is pleased to support the open source community by making polaris-java available.
3+
*
4+
* Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
5+
*
6+
* Licensed under the BSD 3-Clause License (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* https://opensource.org/licenses/BSD-3-Clause
11+
*
12+
* Unless required by applicable law or agreed to in writing, software distributed
13+
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14+
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
15+
* specific language governing permissions and limitations under the License.
16+
*/
17+
18+
package com.tencent.polaris.plugins.connector.common.utils;
19+
20+
import com.google.protobuf.StringValue;
21+
import com.google.protobuf.UInt32Value;
22+
import com.tencent.polaris.api.pojo.ServiceEventKey;
23+
import com.tencent.polaris.specification.api.v1.service.manage.RequestProto;
24+
import com.tencent.polaris.specification.api.v1.service.manage.ResponseProto;
25+
import com.tencent.polaris.specification.api.v1.service.manage.ServiceProto;
26+
27+
import static com.tencent.polaris.specification.api.v1.model.CodeProto.Code.ExecuteSuccess;
28+
29+
/**
30+
* @author Haotian Zhang
31+
*/
32+
public class DiscoverUtils {
33+
34+
public static ResponseProto.DiscoverResponse buildEmptyResponse(ServiceEventKey serviceEventKey) {
35+
ResponseProto.DiscoverResponse.Builder builder = ResponseProto.DiscoverResponse.newBuilder();
36+
builder.setService(
37+
ServiceProto.Service.newBuilder().setName(StringValue.newBuilder().setValue(serviceEventKey.getService()).build())
38+
.setNamespace(
39+
StringValue.newBuilder().setValue(serviceEventKey.getNamespace()).build()));
40+
builder.setCode(UInt32Value.newBuilder().setValue(ExecuteSuccess.getNumber()).build());
41+
builder.setType(buildDiscoverResponseType(serviceEventKey.getEventType()));
42+
return builder.build();
43+
}
44+
45+
public static RequestProto.DiscoverRequest.DiscoverRequestType buildDiscoverRequestType(
46+
ServiceEventKey.EventType type) {
47+
switch (type) {
48+
case INSTANCE:
49+
return RequestProto.DiscoverRequest.DiscoverRequestType.INSTANCE;
50+
case ROUTING:
51+
return RequestProto.DiscoverRequest.DiscoverRequestType.ROUTING;
52+
case RATE_LIMITING:
53+
return RequestProto.DiscoverRequest.DiscoverRequestType.RATE_LIMIT;
54+
case CIRCUIT_BREAKING:
55+
return RequestProto.DiscoverRequest.DiscoverRequestType.CIRCUIT_BREAKER;
56+
case SERVICE:
57+
return RequestProto.DiscoverRequest.DiscoverRequestType.SERVICES;
58+
case FAULT_DETECTING:
59+
return RequestProto.DiscoverRequest.DiscoverRequestType.FAULT_DETECTOR;
60+
case LANE_RULE:
61+
return RequestProto.DiscoverRequest.DiscoverRequestType.LANE;
62+
case NEARBY_ROUTE_RULE:
63+
return RequestProto.DiscoverRequest.DiscoverRequestType.NEARBY_ROUTE_RULE;
64+
case LOSSLESS:
65+
return RequestProto.DiscoverRequest.DiscoverRequestType.LOSSLESS;
66+
case BLOCK_ALLOW_RULE:
67+
return RequestProto.DiscoverRequest.DiscoverRequestType.BLOCK_ALLOW_RULE;
68+
default:
69+
return RequestProto.DiscoverRequest.DiscoverRequestType.UNKNOWN;
70+
}
71+
}
72+
73+
public static ResponseProto.DiscoverResponse.DiscoverResponseType buildDiscoverResponseType(
74+
ServiceEventKey.EventType type) {
75+
switch (type) {
76+
case INSTANCE:
77+
return ResponseProto.DiscoverResponse.DiscoverResponseType.INSTANCE;
78+
case ROUTING:
79+
return ResponseProto.DiscoverResponse.DiscoverResponseType.ROUTING;
80+
case RATE_LIMITING:
81+
return ResponseProto.DiscoverResponse.DiscoverResponseType.RATE_LIMIT;
82+
case CIRCUIT_BREAKING:
83+
return ResponseProto.DiscoverResponse.DiscoverResponseType.CIRCUIT_BREAKER;
84+
case SERVICE:
85+
return ResponseProto.DiscoverResponse.DiscoverResponseType.SERVICES;
86+
case FAULT_DETECTING:
87+
return ResponseProto.DiscoverResponse.DiscoverResponseType.FAULT_DETECTOR;
88+
case LANE_RULE:
89+
return ResponseProto.DiscoverResponse.DiscoverResponseType.LANE;
90+
case NEARBY_ROUTE_RULE:
91+
return ResponseProto.DiscoverResponse.DiscoverResponseType.NEARBY_ROUTE_RULE;
92+
case LOSSLESS:
93+
return ResponseProto.DiscoverResponse.DiscoverResponseType.LOSSLESS;
94+
case BLOCK_ALLOW_RULE:
95+
return ResponseProto.DiscoverResponse.DiscoverResponseType.BLOCK_ALLOW_RULE;
96+
default:
97+
return ResponseProto.DiscoverResponse.DiscoverResponseType.UNKNOWN;
98+
}
99+
}
100+
101+
public static ServiceEventKey.EventType buildEventType(ResponseProto.DiscoverResponse.DiscoverResponseType responseType) {
102+
switch (responseType) {
103+
case INSTANCE:
104+
return ServiceEventKey.EventType.INSTANCE;
105+
case ROUTING:
106+
return ServiceEventKey.EventType.ROUTING;
107+
case RATE_LIMIT:
108+
return ServiceEventKey.EventType.RATE_LIMITING;
109+
case CIRCUIT_BREAKER:
110+
return ServiceEventKey.EventType.CIRCUIT_BREAKING;
111+
case SERVICES:
112+
return ServiceEventKey.EventType.SERVICE;
113+
case FAULT_DETECTOR:
114+
return ServiceEventKey.EventType.FAULT_DETECTING;
115+
case LANE:
116+
return ServiceEventKey.EventType.LANE_RULE;
117+
case NEARBY_ROUTE_RULE:
118+
return ServiceEventKey.EventType.NEARBY_ROUTE_RULE;
119+
case LOSSLESS:
120+
return ServiceEventKey.EventType.LOSSLESS;
121+
case BLOCK_ALLOW_RULE:
122+
return ServiceEventKey.EventType.BLOCK_ALLOW_RULE;
123+
default:
124+
return ServiceEventKey.EventType.UNKNOWN;
125+
}
126+
}
127+
}

polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeConnector.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import com.tencent.polaris.plugins.connector.common.ServiceUpdateTask;
3737
import com.tencent.polaris.plugins.connector.common.constant.ServiceUpdateTaskConstant;
3838
import com.tencent.polaris.plugins.connector.common.constant.ServiceUpdateTaskConstant.Type;
39+
import com.tencent.polaris.plugins.connector.common.utils.DiscoverUtils;
3940
import com.tencent.polaris.plugins.connector.composite.zero.TestConnectivityTask;
4041
import com.tencent.polaris.plugins.connector.composite.zero.TestConnectivityTaskManager;
4142
import com.tencent.polaris.specification.api.v1.service.manage.ResponseProto;
@@ -154,10 +155,29 @@ public void postContextInit(Extensions ctx) throws PolarisException {
154155
@Override
155156
public void registerServiceHandler(ServiceEventHandler handler) throws PolarisException {
156157
checkDestroyed();
158+
ServiceEventKey serviceEventKey = handler.getServiceEventKey();
159+
if (!checkEventSupported(serviceEventKey.getEventType())) {
160+
LOG.info("[CompositeConnector] not supported event type for {}", serviceEventKey);
161+
handler.getEventHandler()
162+
.onEventUpdate(new ServerEvent(serviceEventKey, DiscoverUtils.buildEmptyResponse(serviceEventKey), null));
163+
return;
164+
}
157165
ServiceUpdateTask serviceUpdateTask = new CompositeServiceUpdateTask(handler, this);
158166
submitServiceHandler(serviceUpdateTask, 0);
159167
}
160168

169+
@Override
170+
public boolean checkEventSupported(ServiceEventKey.EventType eventType) throws PolarisException {
171+
checkDestroyed();
172+
boolean supported = true;
173+
for (DestroyableServerConnector sc : serverConnectors) {
174+
if (!sc.checkEventSupported(eventType)) {
175+
supported = false;
176+
}
177+
}
178+
return supported;
179+
}
180+
161181
@Override
162182
public void deRegisterServiceHandler(ServiceEventKey eventKey) throws PolarisException {
163183
checkDestroyed();

polaris-plugins/polaris-plugins-connector/connector-polaris-grpc/src/main/java/com/tencent/polaris/plugins/connector/grpc/GrpcConnector.java

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,14 @@
4545
import com.tencent.polaris.plugins.connector.common.ServiceUpdateTask;
4646
import com.tencent.polaris.plugins.connector.common.constant.ServiceUpdateTaskConstant.Status;
4747
import com.tencent.polaris.plugins.connector.common.constant.ServiceUpdateTaskConstant.Type;
48+
import com.tencent.polaris.plugins.connector.common.utils.DiscoverUtils;
4849
import com.tencent.polaris.plugins.connector.grpc.Connection.ConnID;
4950
import com.tencent.polaris.specification.api.v1.model.ModelProto;
5051
import com.tencent.polaris.specification.api.v1.service.manage.*;
5152
import com.tencent.polaris.specification.api.v1.service.manage.ClientProto.Client;
5253
import com.tencent.polaris.specification.api.v1.service.manage.ClientProto.StatInfo;
5354
import com.tencent.polaris.specification.api.v1.service.manage.RequestProto.DiscoverRequest;
5455
import com.tencent.polaris.specification.api.v1.service.manage.ResponseProto.DiscoverResponse;
55-
import com.tencent.polaris.specification.api.v1.service.manage.ServiceProto.Service;
5656
import io.grpc.stub.StreamObserver;
5757
import org.slf4j.Logger;
5858

@@ -61,7 +61,6 @@
6161
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
6262
import java.util.concurrent.atomic.AtomicReference;
6363

64-
import static com.tencent.polaris.specification.api.v1.model.CodeProto.Code.ExecuteSuccess;
6564
import static com.tencent.polaris.specification.api.v1.model.CodeProto.Code.InvalidDiscoverResource;
6665

6766
/**
@@ -191,25 +190,14 @@ public void registerServiceHandler(ServiceEventHandler handler) {
191190
if (!checkEventSupported(serviceEventKey.getEventType())) {
192191
LOG.info("[ServerConnector] not supported event type for {}", serviceEventKey);
193192
handler.getEventHandler()
194-
.onEventUpdate(new ServerEvent(serviceEventKey, buildEmptyResponse(serviceEventKey), null));
193+
.onEventUpdate(new ServerEvent(serviceEventKey, DiscoverUtils.buildEmptyResponse(serviceEventKey), null));
195194
return;
196195
}
197196
ServiceUpdateTask serviceUpdateTask = new GrpcServiceUpdateTask(handler, this);
198197
submitServiceHandler(serviceUpdateTask, 0);
199198
}
200199

201-
private DiscoverResponse buildEmptyResponse(ServiceEventKey serviceEventKey) {
202-
DiscoverResponse.Builder builder = DiscoverResponse.newBuilder();
203-
builder.setService(
204-
Service.newBuilder().setName(StringValue.newBuilder().setValue(serviceEventKey.getService()).build())
205-
.setNamespace(
206-
StringValue.newBuilder().setValue(serviceEventKey.getNamespace()).build()));
207-
builder.setCode(UInt32Value.newBuilder().setValue(ExecuteSuccess.getNumber()).build());
208-
builder.setType(GrpcUtil.buildDiscoverResponseType(serviceEventKey.getEventType()));
209-
return builder.build();
210-
}
211-
212-
private boolean checkEventSupported(EventType eventType) {
200+
public boolean checkEventSupported(EventType eventType) {
213201
Boolean aBoolean = supportedResourcesType.get(eventType);
214202
if (null != aBoolean) {
215203
return aBoolean;
@@ -257,7 +245,7 @@ public void onCompleted() {
257245
}
258246
});
259247
RequestProto.DiscoverRequest.Builder req = RequestProto.DiscoverRequest.newBuilder();
260-
req.setType(GrpcUtil.buildDiscoverRequestType(eventType));
248+
req.setType(DiscoverUtils.buildDiscoverRequestType(eventType));
261249
discoverClient.onNext(req.build());
262250
try {
263251
countDownLatch.await();

polaris-plugins/polaris-plugins-connector/connector-polaris-grpc/src/main/java/com/tencent/polaris/plugins/connector/grpc/GrpcUtil.java

Lines changed: 0 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,9 @@
2020
import com.tencent.polaris.api.exception.ErrorCode;
2121
import com.tencent.polaris.api.exception.PolarisException;
2222
import com.tencent.polaris.api.exception.ServerCodes;
23-
import com.tencent.polaris.api.pojo.ServiceEventKey;
24-
import com.tencent.polaris.api.pojo.ServiceEventKey.EventType;
2523
import com.tencent.polaris.api.utils.MapUtils;
2624
import com.tencent.polaris.api.utils.StringUtils;
27-
import com.tencent.polaris.specification.api.v1.service.manage.RequestProto.DiscoverRequest.DiscoverRequestType;
2825
import com.tencent.polaris.specification.api.v1.service.manage.ResponseProto;
29-
import com.tencent.polaris.specification.api.v1.service.manage.ResponseProto.DiscoverResponse.DiscoverResponseType;
3026
import io.grpc.Metadata;
3127
import io.grpc.Status;
3228
import io.grpc.StatusRuntimeException;
@@ -234,88 +230,4 @@ public static void checkGrpcException(Throwable t) throws PolarisException {
234230
throw new PolarisException(ErrorCode.SERVER_ERROR, grpcEx.getMessage());
235231
}
236232
}
237-
238-
public static DiscoverRequestType buildDiscoverRequestType(
239-
ServiceEventKey.EventType type) {
240-
switch (type) {
241-
case INSTANCE:
242-
return DiscoverRequestType.INSTANCE;
243-
case ROUTING:
244-
return DiscoverRequestType.ROUTING;
245-
case RATE_LIMITING:
246-
return DiscoverRequestType.RATE_LIMIT;
247-
case CIRCUIT_BREAKING:
248-
return DiscoverRequestType.CIRCUIT_BREAKER;
249-
case SERVICE:
250-
return DiscoverRequestType.SERVICES;
251-
case FAULT_DETECTING:
252-
return DiscoverRequestType.FAULT_DETECTOR;
253-
case LANE_RULE:
254-
return DiscoverRequestType.LANE;
255-
case NEARBY_ROUTE_RULE:
256-
return DiscoverRequestType.NEARBY_ROUTE_RULE;
257-
case LOSSLESS:
258-
return DiscoverRequestType.LOSSLESS;
259-
case BLOCK_ALLOW_RULE:
260-
return DiscoverRequestType.BLOCK_ALLOW_RULE;
261-
default:
262-
return DiscoverRequestType.UNKNOWN;
263-
}
264-
}
265-
266-
public static DiscoverResponseType buildDiscoverResponseType(
267-
ServiceEventKey.EventType type) {
268-
switch (type) {
269-
case INSTANCE:
270-
return DiscoverResponseType.INSTANCE;
271-
case ROUTING:
272-
return DiscoverResponseType.ROUTING;
273-
case RATE_LIMITING:
274-
return DiscoverResponseType.RATE_LIMIT;
275-
case CIRCUIT_BREAKING:
276-
return DiscoverResponseType.CIRCUIT_BREAKER;
277-
case SERVICE:
278-
return DiscoverResponseType.SERVICES;
279-
case FAULT_DETECTING:
280-
return DiscoverResponseType.FAULT_DETECTOR;
281-
case LANE_RULE:
282-
return DiscoverResponseType.LANE;
283-
case NEARBY_ROUTE_RULE:
284-
return DiscoverResponseType.NEARBY_ROUTE_RULE;
285-
case LOSSLESS:
286-
return DiscoverResponseType.LOSSLESS;
287-
case BLOCK_ALLOW_RULE:
288-
return DiscoverResponseType.BLOCK_ALLOW_RULE;
289-
default:
290-
return DiscoverResponseType.UNKNOWN;
291-
}
292-
}
293-
294-
public static EventType buildEventType(DiscoverResponseType responseType) {
295-
switch (responseType) {
296-
case INSTANCE:
297-
return EventType.INSTANCE;
298-
case ROUTING:
299-
return EventType.ROUTING;
300-
case RATE_LIMIT:
301-
return EventType.RATE_LIMITING;
302-
case CIRCUIT_BREAKER:
303-
return EventType.CIRCUIT_BREAKING;
304-
case SERVICES:
305-
return EventType.SERVICE;
306-
case FAULT_DETECTOR:
307-
return EventType.FAULT_DETECTING;
308-
case LANE:
309-
return EventType.LANE_RULE;
310-
case NEARBY_ROUTE_RULE:
311-
return EventType.NEARBY_ROUTE_RULE;
312-
case LOSSLESS:
313-
return EventType.LOSSLESS;
314-
case BLOCK_ALLOW_RULE:
315-
return EventType.BLOCK_ALLOW_RULE;
316-
default:
317-
return EventType.UNKNOWN;
318-
}
319-
}
320-
321233
}

0 commit comments

Comments
 (0)