diff --git a/polaris-common/polaris-metadata/src/main/java/com/tencent/polaris/metadata/core/manager/MetadataContextHolder.java b/polaris-common/polaris-metadata/src/main/java/com/tencent/polaris/metadata/core/manager/MetadataContextHolder.java index 2052038c8..29aa42376 100644 --- a/polaris-common/polaris-metadata/src/main/java/com/tencent/polaris/metadata/core/manager/MetadataContextHolder.java +++ b/polaris-common/polaris-metadata/src/main/java/com/tencent/polaris/metadata/core/manager/MetadataContextHolder.java @@ -20,12 +20,33 @@ import java.util.function.Consumer; import java.util.function.Supplier; +import com.tencent.polaris.logging.LoggerFactory; +import org.slf4j.Logger; + public class MetadataContextHolder { - private static final ThreadLocal THREAD_LOCAL_CONTEXT = new ThreadLocal<>(); + private static final Logger LOG = LoggerFactory.getLogger(MetadataContextHolder.class); + + private static final ThreadLocal THREAD_LOCAL_CONTEXT; private static Supplier initializer; + static { + ThreadLocal tempThreadLocalContext; + try { + // the class name need to be excluded in maven shade plugin + Class clazz = Class.forName("com.alibaba.ttl.TransmittableThreadLocal"); + tempThreadLocalContext = (ThreadLocal) clazz.getDeclaredConstructor(new Class[0]).newInstance(); + LOG.info("Use TransmittableThreadLocal for thread local context"); + } catch (Exception e) { + LOG.debug("Failed to use TransmittableThreadLocal for thread local context, msg: {}", e.getMessage()); + LOG.info("Use standard ThreadLocal for thread local context"); + tempThreadLocalContext = new ThreadLocal<>(); + } + THREAD_LOCAL_CONTEXT = tempThreadLocalContext; + } + + public static MetadataContext getOrCreate() { MetadataContext metadataContext = THREAD_LOCAL_CONTEXT.get(); if (null != metadataContext) { diff --git a/polaris-distribution/polaris-all/pom.xml b/polaris-distribution/polaris-all/pom.xml index 3dc271e77..73117ee84 100644 --- a/polaris-distribution/polaris-all/pom.xml +++ b/polaris-distribution/polaris-all/pom.xml @@ -171,6 +171,9 @@ com.alibaba shade.polaris.com.alibaba + + com.alibaba.ttl.TransmittableThreadLocal + io.perfmark diff --git a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lane/LaneService.java b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lane/LaneService.java index 3db2eedcd..c738ab67d 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lane/LaneService.java +++ b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lane/LaneService.java @@ -64,7 +64,9 @@ import java.util.concurrent.atomic.AtomicReference; import static com.tencent.polaris.api.config.plugin.DefaultPlugins.SERVER_CONNECTOR_CONSUL; +import static com.tencent.polaris.metadata.core.constant.TsfMetadataConstants.TSF_APPLICATION_ID; import static com.tencent.polaris.metadata.core.constant.TsfMetadataConstants.TSF_GROUP_ID; +import static com.tencent.polaris.metadata.core.constant.TsfMetadataConstants.TSF_NAMESPACE_ID; import static com.tencent.polaris.plugins.connector.consul.service.common.TagConditionUtil.parseMatchStringType; /** @@ -216,6 +218,9 @@ private List parseResponse(LaneInfoResponse laneInfoRespons if (laneGroup.isEntrance()) { entranceList.add(laneGroup.getGroupId()); } + if (StringUtils.isNotEmpty(laneGroup.getNamespaceId()) && StringUtils.isNotEmpty(laneGroup.getApplicationId())) { + laneGroupBuilder.putMetadata(laneGroup.getNamespaceId() + "," + laneGroup.getApplicationId(), TSF_NAMESPACE_ID + "," + TSF_APPLICATION_ID); + } } } for (String entrance : entranceList) { diff --git a/polaris-plugins/polaris-plugins-router/router-lane/src/main/java/com/tencent/polaris/plugins/router/lane/LaneRouter.java b/polaris-plugins/polaris-plugins-router/router-lane/src/main/java/com/tencent/polaris/plugins/router/lane/LaneRouter.java index faae0ff37..3054983d0 100644 --- a/polaris-plugins/polaris-plugins-router/router-lane/src/main/java/com/tencent/polaris/plugins/router/lane/LaneRouter.java +++ b/polaris-plugins/polaris-plugins-router/router-lane/src/main/java/com/tencent/polaris/plugins/router/lane/LaneRouter.java @@ -17,43 +17,42 @@ package com.tencent.polaris.plugins.router.lane; -import com.google.protobuf.InvalidProtocolBufferException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + import com.tencent.polaris.api.config.consumer.ServiceRouterConfig; -import com.tencent.polaris.api.exception.ErrorCode; import com.tencent.polaris.api.exception.PolarisException; import com.tencent.polaris.api.plugin.common.InitContext; import com.tencent.polaris.api.plugin.route.RouteInfo; import com.tencent.polaris.api.plugin.route.RouteResult; -import com.tencent.polaris.api.pojo.*; -import com.tencent.polaris.api.rpc.RequestBaseEntity; +import com.tencent.polaris.api.pojo.Instance; +import com.tencent.polaris.api.pojo.ServiceInstances; +import com.tencent.polaris.api.pojo.ServiceKey; import com.tencent.polaris.api.utils.CollectionUtils; -import com.tencent.polaris.api.utils.CompareUtils; import com.tencent.polaris.api.utils.RuleUtils; import com.tencent.polaris.api.utils.StringUtils; -import com.tencent.polaris.client.flow.BaseFlow; -import com.tencent.polaris.client.flow.DefaultFlowControlParam; -import com.tencent.polaris.client.flow.ResourcesResponse; import com.tencent.polaris.logging.LoggerFactory; import com.tencent.polaris.metadata.core.MessageMetadataContainer; -import com.tencent.polaris.metadata.core.MetadataContainer; import com.tencent.polaris.metadata.core.MetadataType; import com.tencent.polaris.metadata.core.TransitiveType; import com.tencent.polaris.metadata.core.manager.MetadataContext; import com.tencent.polaris.metadata.core.manager.MetadataContextHolder; import com.tencent.polaris.plugins.router.common.AbstractServiceRouter; -import com.tencent.polaris.specification.api.v1.service.manage.ResponseProto; import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto; import com.tencent.polaris.specification.api.v1.traffic.manage.RoutingProto; import org.slf4j.Logger; -import java.util.*; -import java.util.function.Function; -import java.util.stream.Collectors; - public class LaneRouter extends AbstractServiceRouter { private static final Logger LOG = LoggerFactory.getLogger(LaneRouter.class); - /** * 处于泳道内的实例标签 */ @@ -64,35 +63,6 @@ public class LaneRouter extends AbstractServiceRouter { */ public static final String TRAFFIC_STAIN_LABEL = "service-lane"; - private static final String GATEWAY_SELECTOR = "polarismesh.cn/gateway/spring-cloud-gateway"; - - private static final String SERVICE_SELECTOR = "polarismesh.cn/service"; - - private final Function> ruleGetter = serviceKey -> { - if (Objects.isNull(serviceKey)) { - return Collections.emptyList(); - } - if (StringUtils.isBlank(serviceKey.getService()) || StringUtils.isBlank(serviceKey.getNamespace())) { - return Collections.emptyList(); - } - - DefaultFlowControlParam engineFlowControlParam = new DefaultFlowControlParam(); - BaseFlow.buildFlowControlParam(new RequestBaseEntity(), extensions.getConfiguration(), engineFlowControlParam); - Set routerKeys = new HashSet<>(); - ServiceEventKey dstSvcEventKey = ServiceEventKey.builder().serviceKey(serviceKey).eventType(ServiceEventKey.EventType.LANE_RULE).build(); - routerKeys.add(dstSvcEventKey); - DefaultServiceEventKeysProvider svcKeysProvider = new DefaultServiceEventKeysProvider(); - svcKeysProvider.setSvcEventKeys(routerKeys); - ResourcesResponse resourcesResponse = BaseFlow - .syncGetResources(extensions, false, svcKeysProvider, engineFlowControlParam); - ServiceRule outbound = resourcesResponse.getServiceRule(dstSvcEventKey); - Object rule = outbound.getRule(); - if (Objects.nonNull(rule)) { - return ((ResponseProto.DiscoverResponse) rule).getLanesList(); - } - return Collections.emptyList(); - }; - @Override public void init(InitContext ctx) throws PolarisException { @@ -117,7 +87,7 @@ public RouteResult router(RouteInfo routeInfo, ServiceInstances instances) throw ServiceKey caller = routeInfo.getSourceService() == null ? null : routeInfo.getSourceService().getServiceKey(); ServiceKey callee = instances.getServiceKey() == null ? null : instances.getServiceKey(); - LaneRuleContainer container = fetchLaneRules(manager, caller, callee); + LaneRuleContainer container = LaneUtils.fetchLaneRules(manager, caller, callee, extensions); // get callee lane group list List laneGroupList = container.getGroupListByCalleeNamespaceAndService(callee); @@ -141,7 +111,7 @@ public RouteResult router(RouteInfo routeInfo, ServiceInstances instances) throw LaneProto.LaneRule laneRule = targetRule.get(); // 尝试进行流量染色动作,该操作仅在当前 Caller 服务为泳道入口时操作 - boolean stainOK = tryStainCurrentTraffic(manager, caller, container, laneRule); + boolean stainOK = LaneUtils.tryStainCurrentTraffic(manager, caller, container, laneRule); if (!stainOK) { // 如果染色失败,即当前 Caller 不是泳道入口,不需要进行染色,只需要将已有的泳道标签进行透传 if (alreadyStain) { @@ -167,7 +137,7 @@ public RouteResult router(RouteInfo routeInfo, ServiceInstances instances) throw private List tryRedirectToLane(LaneRuleContainer container, LaneProto.LaneRule rule, List laneGroupList, ServiceInstances instances) { - LaneProto.LaneGroup group = container.groups.get(rule.getGroupName()); + LaneProto.LaneGroup group = container.getGroups().get(rule.getGroupName()); if (Objects.isNull(group)) { LOG.debug("not found lane_group, redirect to base, lane_rule: {}, lane_group: {}, callee: {}", rule.getName(), rule.getGroupName(), instances.getServiceKey()); // 泳道组不存在,直接认为不需要过滤实例, 默认转发至基线实例 @@ -237,251 +207,4 @@ private List redirectToBase(List laneGroupList, S return true; }).collect(Collectors.toList()); } - - private boolean tryStainCurrentTraffic(MetadataContext manager, ServiceKey caller, LaneRuleContainer container, LaneProto.LaneRule rule) { - if (Objects.isNull(caller)) { - LOG.debug("caller is null, stain current traffic ignore, lane_rule: {}, lane_group: {}", rule.getName(), rule.getGroupName()); - return false; - } - - LaneProto.LaneGroup group = container.groups.get(rule.getGroupName()); - if (Objects.isNull(group)) { - // 泳道规则存在,但是对应的泳道组却不存在,这种情况需要直接抛出异常 - LOG.error("lane_group where lane_rule located not found, lane_rule: {}, lane_group: {}", rule.getName(), rule.getGroupName()); - throw new PolarisException(ErrorCode.INVALID_STATE, "lane_group where lane_rule located not found"); - } - - boolean needStain = isTrafficEntry(group, manager, caller); - if (needStain) { - MessageMetadataContainer metadataContainer = manager.getMetadataContainer(MetadataType.MESSAGE, false); - metadataContainer.setHeader(TRAFFIC_STAIN_LABEL, buildStainLabel(rule), TransitiveType.PASS_THROUGH); - } - LOG.debug("stain current traffic: {}, lane_rule: {}, lane_group: {}, caller: {}", needStain, rule.getName(), rule.getGroupName(), caller); - return needStain; - } - - private LaneRuleContainer fetchLaneRules(MetadataContext manager, ServiceKey caller, ServiceKey callee) { - // 获取泳道规则 - List result = new ArrayList<>(); - if (Objects.nonNull(caller)) { - result.addAll(ruleGetter.apply(caller)); - } - if (Objects.nonNull(callee)) { - result.addAll(ruleGetter.apply(callee)); - } - return new LaneRuleContainer(manager, caller, result); - } - - private static boolean isTrafficEntry(LaneProto.LaneGroup group, MetadataContext manager, ServiceKey caller) { - boolean result = false; - for (LaneProto.TrafficEntry entry : group.getEntriesList()) { - try { - switch (entry.getType()) { - case GATEWAY_SELECTOR: - LaneProto.ServiceGatewaySelector gatewaySelector = entry.getSelector().unpack(LaneProto.ServiceGatewaySelector.class); - if (RuleUtils.matchService(caller, gatewaySelector.getNamespace(), gatewaySelector.getService()) - && RuleUtils.matchMetadata(gatewaySelector.getLabelsMap(), null, manager.getMetadataContainerGroup(false))) { - result = true; - } - break; - case SERVICE_SELECTOR: - LaneProto.ServiceSelector serviceSelector = entry.getSelector().unpack(LaneProto.ServiceSelector.class); - if (RuleUtils.matchService(caller, serviceSelector.getNamespace(), serviceSelector.getService()) - && RuleUtils.matchMetadata(serviceSelector.getLabelsMap(), null, manager.getMetadataContainerGroup(false))) { - result = true; - } - break; - } - } catch (InvalidProtocolBufferException invalidProtocolBufferException) { - LOG.warn("lane_group: {} unpack traffic entry selector fail", group.getName(), invalidProtocolBufferException); - } - } - return result; - } - - private static class LaneRuleContainer { - private final Map groups = new HashMap<>(); - - private final List rules = new LinkedList<>(); - - private final Map ruleMapping = new HashMap<>(); - - LaneRuleContainer(MetadataContext manager, ServiceKey caller, List list) { - list.forEach(laneGroup -> { - if (groups.containsKey(laneGroup.getName())) { - LOG.warn("lane group: {} duplicate, ignore", laneGroup.getName()); - return; - } - groups.put(laneGroup.getName(), laneGroup); - laneGroup.getRulesList().forEach(laneRule -> { - if (!laneRule.getEnable()) { - return; - } - String name = buildStainLabel(laneRule); - ruleMapping.put(name, laneRule); - rules.add(laneRule); - }); - }); - - rules.sort((o1, o2) -> { - // 主调泳道入口规则优先 - boolean b1 = isTrafficEntry(groups.get(o1.getGroupName()), manager, caller); - boolean b2 = isTrafficEntry(groups.get(o2.getGroupName()), manager, caller); - int entryResult = CompareUtils.compareBoolean(b1, b2); - if (entryResult != 0) { - return entryResult; - } - - // 比较优先级,数字越小,规则优先级越大 - return o1.getPriority() - o2.getPriority(); - }); - } - - public List getGroupListByCalleeNamespaceAndService(ServiceKey callee) { - List groupList = new ArrayList<>(); - for (LaneProto.LaneGroup group : groups.values()) { - for (RoutingProto.DestinationGroup destinationGroup : group.getDestinationsList()) { - if (RuleUtils.matchService(callee, destinationGroup.getNamespace(), destinationGroup.getService())) { - groupList.add(group); - } - } - } - return groupList; - } - - public Optional matchRule(String labelValue) { - LaneProto.LaneRule rule = ruleMapping.get(labelValue); - return Optional.ofNullable(rule); - } - - public Optional matchRule(RouteInfo routeInfo, MetadataContext manager) { - // 当前流量无染色,根据泳道规则进行匹配判断 - LaneProto.LaneRule targetRule = null; - for (LaneProto.LaneRule rule : rules) { - if (!rule.getEnable()) { - continue; - } - - LaneProto.TrafficMatchRule matchRule = rule.getTrafficMatchRule(); - - List booleans = new LinkedList<>(); - matchRule.getArgumentsList().forEach(sourceMatch -> { - String trafficValue = findTrafficValue(routeInfo, sourceMatch, manager); - switch (sourceMatch.getValue().getValueType()) { - case TEXT: - // 直接匹配 - boolean a = StringUtils.isNotBlank(trafficValue) && - RuleUtils.matchStringValue(sourceMatch.getValue().getType(), trafficValue, - sourceMatch.getValue().getValue().getValue()); - booleans.add(a); - break; - case VARIABLE: - boolean match = false; - String parameterKey = sourceMatch.getValue().getValue().getValue(); - // 外部参数来源 - Optional parameter = routeInfo.getExternalParameterSupplier().apply(parameterKey); - if (parameter.isPresent()) { - match = RuleUtils.matchStringValue(sourceMatch.getValue().getType(), trafficValue, - parameter.get()); - } - if (!match) { - match = RuleUtils.matchStringValue(sourceMatch.getValue().getType(), trafficValue, - System.getenv(parameterKey)); - } - booleans.add(match); - break; - } - }); - - boolean isMatched = false; - switch (matchRule.getMatchMode()) { - case OR: - for (Boolean a : booleans) { - isMatched = isMatched || a; - } - break; - case AND: - isMatched = true; - for (Boolean a : booleans) { - isMatched = isMatched && a; - } - break; - } - - if (!isMatched) { - continue; - } - targetRule = rule; - break; - } - - return Optional.ofNullable(targetRule); - } - } - - private static String findTrafficValue(RouteInfo routeInfo, RoutingProto.SourceMatch sourceMatch, MetadataContext manager) { - Map trafficLabels = routeInfo.getRouterMetadata(ServiceRouterConfig.DEFAULT_ROUTER_LANE); - - MessageMetadataContainer calleeMessageContainer = manager.getMetadataContainer(MetadataType.MESSAGE, false); - MetadataContainer calleeCustomContainer = manager.getMetadataContainer(MetadataType.CUSTOM, false); - MessageMetadataContainer callerMessageContainer = manager.getMetadataContainer(MetadataType.MESSAGE, true); - - String trafficValue = ""; - switch (sourceMatch.getType()) { - case HEADER: - String headerKey = RouteArgument.ArgumentType.HEADER.key(sourceMatch.getKey()); - if (trafficLabels.containsKey(headerKey)) { - return trafficLabels.get(headerKey); - } - trafficValue = Optional.ofNullable(calleeMessageContainer.getHeader(sourceMatch.getKey())).orElse(callerMessageContainer.getHeader(sourceMatch.getKey())); - break; - case CUSTOM: - String customKey = RouteArgument.ArgumentType.CUSTOM.key(sourceMatch.getKey()); - if (trafficLabels.containsKey(customKey)) { - return trafficLabels.get(customKey); - } - trafficValue = Optional.ofNullable(calleeCustomContainer.getRawMetadataStringValue(sourceMatch.getKey())).orElse(""); - break; - case METHOD: - String methodKey = RouteArgument.ArgumentType.METHOD.key(sourceMatch.getKey()); - if (trafficLabels.containsKey(methodKey)) { - return trafficLabels.get(methodKey); - } - trafficValue = Optional.ofNullable(calleeMessageContainer.getMethod()).orElse(callerMessageContainer.getMethod()); - break; - case CALLER_IP: - String callerIpKey = RouteArgument.ArgumentType.CALLER_IP.key(sourceMatch.getKey()); - if (trafficLabels.containsKey(callerIpKey)) { - return trafficLabels.get(callerIpKey); - } - trafficValue = Optional.ofNullable(calleeMessageContainer.getCallerIP()).orElse(callerMessageContainer.getCallerIP()); - break; - case COOKIE: - String cookieKey = RouteArgument.ArgumentType.COOKIE.key(sourceMatch.getKey()); - if (trafficLabels.containsKey(cookieKey)) { - return trafficLabels.get(cookieKey); - } - trafficValue = Optional.ofNullable(calleeMessageContainer.getCookie(sourceMatch.getKey())).orElse(callerMessageContainer.getCookie(sourceMatch.getKey())); - break; - case QUERY: - String queryKey = RouteArgument.ArgumentType.QUERY.key(sourceMatch.getKey()); - if (trafficLabels.containsKey(queryKey)) { - return trafficLabels.get(queryKey); - } - trafficValue = Optional.ofNullable(calleeMessageContainer.getQuery(sourceMatch.getKey())).orElse(callerMessageContainer.getQuery(sourceMatch.getKey())); - break; - case PATH: - String pathKey = RouteArgument.ArgumentType.PATH.key(sourceMatch.getKey()); - if (trafficLabels.containsKey(pathKey)) { - return trafficLabels.get(pathKey); - } - trafficValue = Optional.ofNullable(calleeMessageContainer.getPath()).orElse(callerMessageContainer.getPath()); - break; - } - return trafficValue; - } - - private static String buildStainLabel(LaneProto.LaneRule rule) { - return rule.getGroupName() + "/" + rule.getName(); - } } diff --git a/polaris-plugins/polaris-plugins-router/router-lane/src/main/java/com/tencent/polaris/plugins/router/lane/LaneRuleContainer.java b/polaris-plugins/polaris-plugins-router/router-lane/src/main/java/com/tencent/polaris/plugins/router/lane/LaneRuleContainer.java new file mode 100644 index 000000000..2425d7345 --- /dev/null +++ b/polaris-plugins/polaris-plugins-router/router-lane/src/main/java/com/tencent/polaris/plugins/router/lane/LaneRuleContainer.java @@ -0,0 +1,213 @@ +/* + * Tencent is pleased to support the open source community by making polaris-java available. + * + * Copyright (C) 2021 Tencent. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.polaris.plugins.router.lane; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import com.tencent.polaris.api.plugin.route.RouteInfo; +import com.tencent.polaris.api.pojo.ServiceKey; +import com.tencent.polaris.api.utils.CompareUtils; +import com.tencent.polaris.api.utils.RuleUtils; +import com.tencent.polaris.api.utils.StringUtils; +import com.tencent.polaris.logging.LoggerFactory; +import com.tencent.polaris.metadata.core.manager.MetadataContext; +import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto; +import com.tencent.polaris.specification.api.v1.traffic.manage.RoutingProto; +import org.slf4j.Logger; + +public class LaneRuleContainer { + private static final Logger LOG = LoggerFactory.getLogger(LaneRuleContainer.class); + + private final Map groups = new HashMap<>(); + + private final List rules = new LinkedList<>(); + + private final Map ruleMapping = new HashMap<>(); + + public LaneRuleContainer(MetadataContext manager, ServiceKey caller, List list) { + list.forEach(laneGroup -> { + if (groups.containsKey(laneGroup.getName())) { + LOG.debug("lane group: {} duplicate, ignore", laneGroup.getName()); + return; + } + groups.put(laneGroup.getName(), laneGroup); + laneGroup.getRulesList().forEach(laneRule -> { + if (!laneRule.getEnable()) { + return; + } + String name = LaneUtils.buildStainLabel(laneRule); + ruleMapping.put(name, laneRule); + rules.add(laneRule); + }); + }); + + rules.sort((o1, o2) -> { + // 主调泳道入口规则优先 + boolean b1 = LaneUtils.isTrafficEntry(groups.get(o1.getGroupName()), manager, caller); + boolean b2 = LaneUtils.isTrafficEntry(groups.get(o2.getGroupName()), manager, caller); + int entryResult = CompareUtils.compareBoolean(b1, b2); + if (entryResult != 0) { + return entryResult; + } + + // 比较优先级,数字越小,规则优先级越大 + return o1.getPriority() - o2.getPriority(); + }); + } + + public Map getGroups() { + return groups; + } + + public List getGroupListByCalleeNamespaceAndService(ServiceKey callee) { + List groupList = new ArrayList<>(); + for (LaneProto.LaneGroup group : groups.values()) { + for (RoutingProto.DestinationGroup destinationGroup : group.getDestinationsList()) { + if (RuleUtils.matchService(callee, destinationGroup.getNamespace(), destinationGroup.getService())) { + groupList.add(group); + } + } + } + return groupList; + } + + public Optional matchRule(String labelValue) { + LaneProto.LaneRule rule = ruleMapping.get(labelValue); + return Optional.ofNullable(rule); + } + + public Optional matchRule(RouteInfo routeInfo, MetadataContext manager) { + // 当前流量无染色,根据泳道规则进行匹配判断 + LaneProto.LaneRule targetRule = null; + for (LaneProto.LaneRule rule : rules) { + if (!rule.getEnable()) { + continue; + } + + LaneProto.TrafficMatchRule matchRule = rule.getTrafficMatchRule(); + + List booleans = new LinkedList<>(); + matchRule.getArgumentsList().forEach(sourceMatch -> { + String trafficValue = LaneUtils.findTrafficValue(routeInfo, sourceMatch, manager); + switch (sourceMatch.getValue().getValueType()) { + case TEXT: + // 直接匹配 + boolean a = StringUtils.isNotBlank(trafficValue) && + RuleUtils.matchStringValue(sourceMatch.getValue().getType(), trafficValue, + sourceMatch.getValue().getValue().getValue()); + booleans.add(a); + break; + case VARIABLE: + boolean match = false; + String parameterKey = sourceMatch.getValue().getValue().getValue(); + // 外部参数来源 + Optional parameter = routeInfo.getExternalParameterSupplier().apply(parameterKey); + if (parameter.isPresent()) { + match = RuleUtils.matchStringValue(sourceMatch.getValue().getType(), trafficValue, + parameter.get()); + } + if (!match) { + match = RuleUtils.matchStringValue(sourceMatch.getValue().getType(), trafficValue, + System.getenv(parameterKey)); + } + booleans.add(match); + break; + } + }); + + boolean isMatched = false; + switch (matchRule.getMatchMode()) { + case OR: + for (Boolean a : booleans) { + isMatched = isMatched || a; + } + break; + case AND: + isMatched = true; + for (Boolean a : booleans) { + isMatched = isMatched && a; + } + break; + } + + if (!isMatched) { + continue; + } + targetRule = rule; + break; + } + + return Optional.ofNullable(targetRule); + } + + public Optional matchRule(MetadataContext manager) { + // 当前流量无染色,根据泳道规则进行匹配判断 + LaneProto.LaneRule targetRule = null; + for (LaneProto.LaneRule rule : rules) { + if (!rule.getEnable()) { + continue; + } + + LaneProto.TrafficMatchRule matchRule = rule.getTrafficMatchRule(); + + List booleans = new LinkedList<>(); + matchRule.getArgumentsList().forEach(sourceMatch -> { + String trafficValue = LaneUtils.findTrafficValue(sourceMatch, manager); + switch (sourceMatch.getValue().getValueType()) { + case TEXT: + // 直接匹配 + boolean a = StringUtils.isNotBlank(trafficValue) && + RuleUtils.matchStringValue(sourceMatch.getValue().getType(), trafficValue, + sourceMatch.getValue().getValue().getValue()); + booleans.add(a); + break; + } + }); + + boolean isMatched = false; + switch (matchRule.getMatchMode()) { + case OR: + for (Boolean a : booleans) { + isMatched = isMatched || a; + } + break; + case AND: + isMatched = true; + for (Boolean a : booleans) { + isMatched = isMatched && a; + } + break; + } + + if (!isMatched) { + continue; + } + targetRule = rule; + break; + } + + return Optional.ofNullable(targetRule); + } + + +} \ No newline at end of file diff --git a/polaris-plugins/polaris-plugins-router/router-lane/src/main/java/com/tencent/polaris/plugins/router/lane/LaneUtils.java b/polaris-plugins/polaris-plugins-router/router-lane/src/main/java/com/tencent/polaris/plugins/router/lane/LaneUtils.java new file mode 100644 index 000000000..22774ace1 --- /dev/null +++ b/polaris-plugins/polaris-plugins-router/router-lane/src/main/java/com/tencent/polaris/plugins/router/lane/LaneUtils.java @@ -0,0 +1,335 @@ +/* + * Tencent is pleased to support the open source community by making polaris-java available. + * + * Copyright (C) 2021 Tencent. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.polaris.plugins.router.lane; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.tencent.polaris.api.config.consumer.ServiceRouterConfig; +import com.tencent.polaris.api.exception.ErrorCode; +import com.tencent.polaris.api.exception.PolarisException; +import com.tencent.polaris.api.plugin.compose.Extensions; +import com.tencent.polaris.api.plugin.route.RouteInfo; +import com.tencent.polaris.api.pojo.DefaultServiceEventKeysProvider; +import com.tencent.polaris.api.pojo.RouteArgument; +import com.tencent.polaris.api.pojo.ServiceEventKey; +import com.tencent.polaris.api.pojo.ServiceKey; +import com.tencent.polaris.api.pojo.ServiceRule; +import com.tencent.polaris.api.rpc.RequestBaseEntity; +import com.tencent.polaris.api.utils.RuleUtils; +import com.tencent.polaris.api.utils.StringUtils; +import com.tencent.polaris.client.flow.BaseFlow; +import com.tencent.polaris.client.flow.DefaultFlowControlParam; +import com.tencent.polaris.client.flow.ResourcesResponse; +import com.tencent.polaris.logging.LoggerFactory; +import com.tencent.polaris.metadata.core.MessageMetadataContainer; +import com.tencent.polaris.metadata.core.MetadataContainer; +import com.tencent.polaris.metadata.core.MetadataType; +import com.tencent.polaris.metadata.core.TransitiveType; +import com.tencent.polaris.metadata.core.manager.MetadataContext; +import com.tencent.polaris.metadata.core.manager.MetadataContextHolder; +import com.tencent.polaris.specification.api.v1.service.manage.ResponseProto; +import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto; +import com.tencent.polaris.specification.api.v1.traffic.manage.RoutingProto; +import org.slf4j.Logger; + +import static com.tencent.polaris.plugins.router.lane.LaneRouter.TRAFFIC_STAIN_LABEL; + +public class LaneUtils { + + private static final Logger LOG = LoggerFactory.getLogger(LaneUtils.class); + + private static final String GATEWAY_SELECTOR = "polarismesh.cn/gateway/spring-cloud-gateway"; + + private static final String SERVICE_SELECTOR = "polarismesh.cn/service"; + + + public static List getLaneGroups(ServiceKey serviceKey, Extensions extensions) { + if (Objects.isNull(serviceKey)) { + return Collections.emptyList(); + } + if (StringUtils.isBlank(serviceKey.getService()) || StringUtils.isBlank(serviceKey.getNamespace())) { + return Collections.emptyList(); + } + + DefaultFlowControlParam engineFlowControlParam = new DefaultFlowControlParam(); + BaseFlow.buildFlowControlParam(new RequestBaseEntity(), extensions.getConfiguration(), engineFlowControlParam); + Set routerKeys = new HashSet<>(); + ServiceEventKey dstSvcEventKey = ServiceEventKey.builder().serviceKey(serviceKey).eventType(ServiceEventKey.EventType.LANE_RULE).build(); + routerKeys.add(dstSvcEventKey); + DefaultServiceEventKeysProvider svcKeysProvider = new DefaultServiceEventKeysProvider(); + svcKeysProvider.setSvcEventKeys(routerKeys); + ResourcesResponse resourcesResponse = BaseFlow + .syncGetResources(extensions, false, svcKeysProvider, engineFlowControlParam); + ServiceRule outbound = resourcesResponse.getServiceRule(dstSvcEventKey); + Object rule = outbound.getRule(); + if (Objects.nonNull(rule)) { + return ((ResponseProto.DiscoverResponse) rule).getLanesList(); + } + return Collections.emptyList(); + } + + public static boolean tryStainCurrentTraffic(MetadataContext manager, ServiceKey caller, LaneRuleContainer container, LaneProto.LaneRule rule) { + if (Objects.isNull(caller)) { + LOG.debug("caller is null, stain current traffic ignore, lane_rule: {}, lane_group: {}", rule.getName(), rule.getGroupName()); + return false; + } + + LaneProto.LaneGroup group = container.getGroups().get(rule.getGroupName()); + if (Objects.isNull(group)) { + // 泳道规则存在,但是对应的泳道组却不存在,这种情况需要直接抛出异常 + LOG.error("lane_group where lane_rule located not found, lane_rule: {}, lane_group: {}", rule.getName(), rule.getGroupName()); + throw new PolarisException(ErrorCode.INVALID_STATE, "lane_group where lane_rule located not found"); + } + + boolean needStain = isTrafficEntry(group, manager, caller); + if (needStain) { + MessageMetadataContainer metadataContainer = manager.getMetadataContainer(MetadataType.MESSAGE, false); + metadataContainer.setHeader(LaneRouter.TRAFFIC_STAIN_LABEL, buildStainLabel(rule), TransitiveType.PASS_THROUGH); + } + LOG.debug("stain current traffic: {}, lane_rule: {}, lane_group: {}, caller: {}", needStain, rule.getName(), rule.getGroupName(), caller); + return needStain; + } + + public static LaneRuleContainer fetchLaneRules(MetadataContext manager, ServiceKey caller, ServiceKey callee, Extensions extensions) { + // 获取泳道规则 + List result = new ArrayList<>(); + if (Objects.nonNull(caller)) { + result.addAll(getLaneGroups(caller, extensions)); + } + if (Objects.nonNull(callee)) { + result.addAll(getLaneGroups(callee, extensions)); + } + return new LaneRuleContainer(manager, caller, result); + } + + public static boolean isTrafficEntry(LaneProto.LaneGroup group, MetadataContext manager, ServiceKey caller) { + boolean result = false; + for (LaneProto.TrafficEntry entry : group.getEntriesList()) { + try { + switch (entry.getType()) { + case GATEWAY_SELECTOR: + LaneProto.ServiceGatewaySelector gatewaySelector = entry.getSelector().unpack(LaneProto.ServiceGatewaySelector.class); + if (RuleUtils.matchService(caller, gatewaySelector.getNamespace(), gatewaySelector.getService()) + && RuleUtils.matchMetadata(gatewaySelector.getLabelsMap(), null, manager.getMetadataContainerGroup(false))) { + result = true; + } + break; + case SERVICE_SELECTOR: + LaneProto.ServiceSelector serviceSelector = entry.getSelector().unpack(LaneProto.ServiceSelector.class); + if (RuleUtils.matchService(caller, serviceSelector.getNamespace(), serviceSelector.getService()) + && RuleUtils.matchMetadata(serviceSelector.getLabelsMap(), null, manager.getMetadataContainerGroup(false))) { + result = true; + } + break; + } + } catch (InvalidProtocolBufferException invalidProtocolBufferException) { + LOG.warn("lane_group: {} unpack traffic entry selector fail", group.getName(), invalidProtocolBufferException); + } + } + return result; + } + + + + public static String findTrafficValue(RouteInfo routeInfo, RoutingProto.SourceMatch sourceMatch, MetadataContext manager) { + Map trafficLabels = routeInfo.getRouterMetadata(ServiceRouterConfig.DEFAULT_ROUTER_LANE); + + MessageMetadataContainer calleeMessageContainer = manager.getMetadataContainer(MetadataType.MESSAGE, false); + MetadataContainer calleeCustomContainer = manager.getMetadataContainer(MetadataType.CUSTOM, false); + MessageMetadataContainer callerMessageContainer = manager.getMetadataContainer(MetadataType.MESSAGE, true); + + String trafficValue = ""; + switch (sourceMatch.getType()) { + case HEADER: + String headerKey = RouteArgument.ArgumentType.HEADER.key(sourceMatch.getKey()); + if (trafficLabels.containsKey(headerKey)) { + return trafficLabels.get(headerKey); + } + trafficValue = Optional.ofNullable(calleeMessageContainer.getHeader(sourceMatch.getKey())).orElse(callerMessageContainer.getHeader(sourceMatch.getKey())); + break; + case CUSTOM: + String customKey = RouteArgument.ArgumentType.CUSTOM.key(sourceMatch.getKey()); + if (trafficLabels.containsKey(customKey)) { + return trafficLabels.get(customKey); + } + trafficValue = Optional.ofNullable(calleeCustomContainer.getRawMetadataStringValue(sourceMatch.getKey())).orElse(""); + break; + case METHOD: + String methodKey = RouteArgument.ArgumentType.METHOD.key(sourceMatch.getKey()); + if (trafficLabels.containsKey(methodKey)) { + return trafficLabels.get(methodKey); + } + trafficValue = Optional.ofNullable(calleeMessageContainer.getMethod()).orElse(callerMessageContainer.getMethod()); + break; + case CALLER_IP: + String callerIpKey = RouteArgument.ArgumentType.CALLER_IP.key(sourceMatch.getKey()); + if (trafficLabels.containsKey(callerIpKey)) { + return trafficLabels.get(callerIpKey); + } + trafficValue = Optional.ofNullable(calleeMessageContainer.getCallerIP()).orElse(callerMessageContainer.getCallerIP()); + break; + case COOKIE: + String cookieKey = RouteArgument.ArgumentType.COOKIE.key(sourceMatch.getKey()); + if (trafficLabels.containsKey(cookieKey)) { + return trafficLabels.get(cookieKey); + } + trafficValue = Optional.ofNullable(calleeMessageContainer.getCookie(sourceMatch.getKey())).orElse(callerMessageContainer.getCookie(sourceMatch.getKey())); + break; + case QUERY: + String queryKey = RouteArgument.ArgumentType.QUERY.key(sourceMatch.getKey()); + if (trafficLabels.containsKey(queryKey)) { + return trafficLabels.get(queryKey); + } + trafficValue = Optional.ofNullable(calleeMessageContainer.getQuery(sourceMatch.getKey())).orElse(callerMessageContainer.getQuery(sourceMatch.getKey())); + break; + case PATH: + String pathKey = RouteArgument.ArgumentType.PATH.key(sourceMatch.getKey()); + if (trafficLabels.containsKey(pathKey)) { + return trafficLabels.get(pathKey); + } + trafficValue = Optional.ofNullable(calleeMessageContainer.getPath()).orElse(callerMessageContainer.getPath()); + break; + } + return trafficValue; + } + + public static String findTrafficValue(RoutingProto.SourceMatch sourceMatch, MetadataContext manager) { + + MessageMetadataContainer calleeMessageContainer = manager.getMetadataContainer(MetadataType.MESSAGE, false); + MetadataContainer calleeCustomContainer = manager.getMetadataContainer(MetadataType.CUSTOM, false); + MessageMetadataContainer callerMessageContainer = manager.getMetadataContainer(MetadataType.MESSAGE, true); + + String trafficValue = ""; + switch (sourceMatch.getType()) { + case HEADER: + trafficValue = Optional.ofNullable(calleeMessageContainer.getHeader(sourceMatch.getKey())).orElse(callerMessageContainer.getHeader(sourceMatch.getKey())); + break; + case CUSTOM: + trafficValue = Optional.ofNullable(calleeCustomContainer.getRawMetadataStringValue(sourceMatch.getKey())).orElse(""); + break; + case METHOD: + trafficValue = Optional.ofNullable(calleeMessageContainer.getMethod()).orElse(callerMessageContainer.getMethod()); + break; + case CALLER_IP: + trafficValue = Optional.ofNullable(calleeMessageContainer.getCallerIP()).orElse(callerMessageContainer.getCallerIP()); + break; + case COOKIE: + trafficValue = Optional.ofNullable(calleeMessageContainer.getCookie(sourceMatch.getKey())).orElse(callerMessageContainer.getCookie(sourceMatch.getKey())); + break; + case QUERY: + trafficValue = Optional.ofNullable(calleeMessageContainer.getQuery(sourceMatch.getKey())).orElse(callerMessageContainer.getQuery(sourceMatch.getKey())); + break; + case PATH: + trafficValue = Optional.ofNullable(calleeMessageContainer.getPath()).orElse(callerMessageContainer.getPath()); + break; + } + return trafficValue; + } + + public static String buildStainLabel(LaneProto.LaneRule rule) { + return rule.getGroupName() + "/" + rule.getName(); + } + + /** + * 根据 caller 信息,匹配泳道,用于 mq 泳道等场景. + */ + public static String fetchLaneByCaller(Extensions extensions, String namespace, String serviceName) { + MetadataContext manager = MetadataContextHolder.getOrCreate(); + MessageMetadataContainer callerMsgContainer = manager.getMetadataContainer(MetadataType.MESSAGE, true); + ServiceKey caller = new ServiceKey(namespace, serviceName); + + LaneRuleContainer container = LaneUtils.fetchLaneRules(manager, caller, null, extensions); + + // 判断当前流量是否已存在染色,如果存在,则直接返回 + String stainLabel = callerMsgContainer.getHeader(TRAFFIC_STAIN_LABEL); + if (StringUtils.isNotBlank(stainLabel)) { + MessageMetadataContainer metadataContainer = manager.getMetadataContainer(MetadataType.MESSAGE, false); + metadataContainer.setHeader(LaneRouter.TRAFFIC_STAIN_LABEL, stainLabel, TransitiveType.PASS_THROUGH); + return stainLabel; + } + // 当前流量无染色,根据泳道规则进行匹配判断 + Optional targetRule = container.matchRule(manager); + + // 泳道规则不存在,直接返回 + if (!targetRule.isPresent()) { + return null; + } + + LaneProto.LaneRule laneRule = targetRule.get(); + // 尝试进行流量染色动作,该操作仅在当前 Caller 服务为泳道入口时操作 + LaneUtils.tryStainCurrentTraffic(manager, caller, container, laneRule); + + MessageMetadataContainer metadataContainer = manager.getMetadataContainer(MetadataType.MESSAGE, false); + return metadataContainer.getHeader(LaneRouter.TRAFFIC_STAIN_LABEL); + } + + /** + * Sets the lane ID for the caller. + *

+ * This method is used in two primary scenarios: + *

    + *
  1. Standard Polaris Producer: When the message queue producer is a standard Polaris implementation, + * the lane ID is automatically set within the consumer's {@code KafkaLaneAspect}.
  2. + *
  3. Custom MQ Producer: When the producer is a custom implementation, + * the lane ID must be explicitly set by the developer in the consumer code, + * prior to the execution of the {@code KafkaLaneAspect}.
  4. + *
+ * + * @param laneId the lane identifier to be associated with the caller + */ + public static void setCallerLaneId(String laneId) { + MetadataContext manager = MetadataContextHolder.getOrCreate(); + MessageMetadataContainer callerMsgContainer = manager.getMetadataContainer(MetadataType.MESSAGE, true); + callerMsgContainer.setHeader(LaneRouter.TRAFFIC_STAIN_LABEL, laneId, TransitiveType.PASS_THROUGH); + } + + /** + * get upstream lane id. + * @return lane id of upstream service + */ + public static String getCallerLaneId() { + MetadataContext manager = MetadataContextHolder.getOrCreate(); + MessageMetadataContainer callerMsgContainer = manager.getMetadataContainer(MetadataType.MESSAGE, true); + return callerMsgContainer.getHeader(LaneRouter.TRAFFIC_STAIN_LABEL); + } + + public static void removeCallerLaneId() { + MetadataContext manager = MetadataContextHolder.getOrCreate(); + MessageMetadataContainer callerMsgContainer = manager.getMetadataContainer(MetadataType.MESSAGE, true); + callerMsgContainer.setHeader(LaneRouter.TRAFFIC_STAIN_LABEL, "", TransitiveType.PASS_THROUGH); + } + + /** + * get current lane id. + * @return lane id of current service + */ + public static String getCalleeLaneId() { + MetadataContext manager = MetadataContextHolder.getOrCreate(); + MessageMetadataContainer calleeMsgContainer = manager.getMetadataContainer(MetadataType.MESSAGE, false); + return calleeMsgContainer.getHeader(LaneRouter.TRAFFIC_STAIN_LABEL); + } +}