|
| 1 | +## 多版本控制 |
| 2 | + |
| 3 | +该项目是在spring-cloud-ribbon的基础上进行扩展,以实现接口的多个版本的调用及负载均衡,支持feign方式和断路器(spring-cloud-hystrix)。 |
| 4 | + |
| 5 | + |
| 6 | +##### 场景 |
| 7 | +服务A部署了两个实例 serivceA-1,serviceA-2, spring cloud ribbon默认是轮询的方式将请求分别转到两个实例上。如果由于业务原因,服务需要从1.0升级到2.0。 |
| 8 | + |
| 9 | +场景1:将所有服务实例平缓的过度到2.0。 |
| 10 | +场景2:2.0的服务实例需要兼容1.0的服务接口。 |
| 11 | + |
| 12 | + |
| 13 | + |
| 14 | +##### 思路 |
| 15 | +在spring cloud微服务体系中,服务的请求来源无外乎两个方面: |
| 16 | +来源1:外部请求通过网关(zuul)转发而来。 |
| 17 | +来源2:内部服务之间的调用请求。 |
| 18 | +不论网关转发过来的请求,还是内部服务调用过来的请求,都需要ribbon做负载均衡,所以可以扩展ribbon的负载均衡策略从而实现不同版本的请求转发到不同的服务实例上。 |
| 19 | + |
| 20 | + |
| 21 | + |
| 22 | +网关的转发过程是:zuul > hystrix > ribbon |
| 23 | +内部服务调用的过程有两种: |
| 24 | +RestTemplate > hystrix > ribbon |
| 25 | +Feign > hystrix > ribbon |
| 26 | + |
| 27 | +而其中hystrix有一个线程池隔离的能力,会创建另一个线程去请求服务,拥有更好的控制并发访问量、以及服务降级等能力,但是会出现一个问题,就是线程变量(ThreadLocal)的传递问题,这可以通过com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableDefault对象解决。 |
| 28 | + |
| 29 | + |
| 30 | + |
| 31 | + |
| 32 | +##### 代码设计 |
| 33 | +虽然整个项目实现起来代码量不少, 但是在接口设计上, 却只有三个简单的接口负责数据传递,路由的逻辑依然是封装在实现了IRule接口的实现类中(后面分析)。 |
| 34 | + |
| 35 | + |
| 36 | + |
| 37 | +* BambooRibbonConnectionPoint |
| 38 | +这个接口是负责将bamboo跟ribbon连接起来的,将请求的信息, 以及根据业务需要添加的一些路由信息,和获取请求接口的目标版本,还有触发执行LoadBanceRequestTrigger等,都是由该接口的实现类DefaultRibbonConnectionPoint负责实现。 |
| 39 | +```java |
| 40 | +public class DefaultRibbonConnectionPoint implements BambooRibbonConnectionPoint, ApplicationContextAware { |
| 41 | + ... |
| 42 | + @Override |
| 43 | + public void executeConnectPoint(ConnectPointContext connectPointContext) { |
| 44 | + ConnectPointContext.contextLocal.set(connectPointContext); |
| 45 | + BambooRequest bambooRequest = connectPointContext.getBambooRequest(); |
| 46 | + String requestVersion = versionExtractor.extractVersion(bambooRequest); |
| 47 | + BambooRequestContext.initRequestContext(bambooRequest, requestVersion); |
| 48 | + executeBeforeReuqestTrigger(); |
| 49 | + } |
| 50 | + |
| 51 | + @Override |
| 52 | + public void shutdownconnectPoint() { |
| 53 | + try { |
| 54 | + executeAfterReuqestTrigger(); |
| 55 | + } catch (Exception e) { |
| 56 | + ConnectPointContext.getContextLocal().setExcption(e); |
| 57 | + } finally { |
| 58 | + curRequestTriggers.remove(); |
| 59 | + ConnectPointContext.contextLocal.remove(); |
| 60 | + BambooRequestContext.shutdownRequestContext(); |
| 61 | + } |
| 62 | + } |
| 63 | + ... |
| 64 | +} |
| 65 | +``` |
| 66 | + |
| 67 | + |
| 68 | +* RequestVersionExtractor |
| 69 | +这个接口负责获取请求需要访问的目标接口的版本。比如有些接口版本是放在路径上,如:/v1/api/test/get。也有放在uri参数中:/api/test/get?v=1。也有可能放到header中,所以在bamboo抽象出来一个接口, 具体的实现由开发者根据业务去实现。 |
| 70 | + |
| 71 | + |
| 72 | +* LoadBalanceRequestTrigger |
| 73 | +Ribbon请求的触发器,在ribbon请求发起时, 会被执行。这个接口有三个方法,分别是判断是否需要执行的方法(shouldExecute),以及请求之前执行(before)和请求完成之后执行(after),如果出现异常,after方法依然会被执行。 |
| 74 | + |
| 75 | + |
| 76 | +##### 代码实现 |
| 77 | +上面三个接口只是简单的实现了获取请求的目标版本、触发ribbon请求的触发器,以及将信息向下一步传递。在这一段中,将介绍如何与zuul、feign、RestTemplate以及ribbon和hystrix衔接起来。 |
| 78 | + |
| 79 | +* RestTemplate衔接 |
| 80 | +ClientHttpRequestInterceptor是RestTemplate的拦截器接口,可以通过这个接口添加bamboo的逻辑, 从而将RestTemplate和bamboo衔接起来。 |
| 81 | +BambooClientHttpRequestIntercptor是ClientHttpRequestInterceptor接口的实现类,它加入了bamboo的逻辑。 |
| 82 | +```java |
| 83 | +/** |
| 84 | + * 用于@LoadBalance 标记的 RestTemplate,主要作用是用来获取request的相关信息,为后面的路由提供数据基础。 |
| 85 | + */ |
| 86 | +public class BambooClientHttpRequestIntercptor implements ClientHttpRequestInterceptor { |
| 87 | + @Override |
| 88 | + public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException { |
| 89 | + |
| 90 | + URI uri = request.getURI(); |
| 91 | + BambooRequest bambooRequest = BambooRequest.builder() |
| 92 | + .serviceId(uri.getHost()) |
| 93 | + .uri(uri.getPath()) |
| 94 | + .ip(RequestIpKeeper.getRequestIp()) |
| 95 | + .addMultiHeaders(request.getHeaders()) |
| 96 | + .addMultiParams(WebUtils.getQueryParams(uri.getQuery())) |
| 97 | + .build(); |
| 98 | + |
| 99 | + ConnectPointContext connectPointContext = ConnectPointContext.builder().bambooRequest(bambooRequest).build(); |
| 100 | + try { |
| 101 | + BambooAppContext.getBambooRibbonConnectionPoint().executeConnectPoint(connectPointContext); |
| 102 | + return execution.execute(request, body); |
| 103 | + } finally { |
| 104 | + BambooAppContext.getBambooRibbonConnectionPoint().shutdownconnectPoint(); |
| 105 | + } |
| 106 | + } |
| 107 | +} |
| 108 | +``` |
| 109 | + |
| 110 | +* Feign衔接 |
| 111 | +BambooFeignClient类实现了feign.Client接口, 该类是一个代理类,主要的Feign的调用逻辑依然由被代理的类去执行,在该类中添加了bamboo的逻辑,从而将Feign和bamboo衔接起来。 |
| 112 | +```java |
| 113 | +/** |
| 114 | + * 主要作用是用来获取request的相关信息,为后面的路由提供数据基础。 |
| 115 | + */ |
| 116 | +public class BambooFeignClient implements Client { |
| 117 | + |
| 118 | + private Client delegate; |
| 119 | + |
| 120 | + |
| 121 | + public BambooFeignClient(Client delegate) { |
| 122 | + this.delegate = delegate; |
| 123 | + } |
| 124 | + |
| 125 | + @Override |
| 126 | + public Response execute(Request request, Request.Options options) throws IOException { |
| 127 | + URI uri = URI.create(request.url()); |
| 128 | + BambooRequest.Builder builder = BambooRequest.builder() |
| 129 | + .serviceId(uri.getHost()) |
| 130 | + .uri(uri.getPath()) |
| 131 | + .ip(RequestIpKeeper.getRequestIp()) |
| 132 | + .addMultiParams(WebUtils.getQueryParams(uri.getQuery())); |
| 133 | + |
| 134 | + request.headers().entrySet().forEach(entry ->{ |
| 135 | + for (String v : entry.getValue()) { |
| 136 | + builder.addHeader(entry.getKey(), v); |
| 137 | + } |
| 138 | + }); |
| 139 | + |
| 140 | + ConnectPointContext connectPointContext = ConnectPointContext.builder().bambooRequest(builder.build()).build(); |
| 141 | + |
| 142 | + try { |
| 143 | + BambooAppContext.getBambooRibbonConnectionPoint().executeConnectPoint(connectPointContext); |
| 144 | + return delegate.execute(request, options); |
| 145 | + }finally { |
| 146 | + BambooAppContext.getBambooRibbonConnectionPoint().shutdownconnectPoint(); |
| 147 | + } |
| 148 | + } |
| 149 | +} |
| 150 | +``` |
| 151 | + |
| 152 | + |
| 153 | +* Zuul衔接 |
| 154 | +实现两个ZuulFilter接口,分别是pre和post类型,将bamboo的逻辑加入其中。Pre类型的ZuulFilter获取请求信息,并执行LoadBalanceRequestTrigger#before方法。Post类型的ZuulFilter执行LoadBalanceRequestTrigger#after方法,并清除存在ThradLocal中的相关信息。 |
| 155 | +```java |
| 156 | +/** |
| 157 | + * 主要作用是用来获取request的相关信息,为后面的路由提供数据基础。 |
| 158 | + */ |
| 159 | +public class BambooPreZuulFilter extends ZuulFilter { |
| 160 | + @Override |
| 161 | + public String filterType() { |
| 162 | + return FilterConstants.PRE_TYPE; |
| 163 | + } |
| 164 | + |
| 165 | + @Override |
| 166 | + public int filterOrder() { |
| 167 | + return 10000; |
| 168 | + } |
| 169 | + |
| 170 | + @Override |
| 171 | + public boolean shouldFilter() { |
| 172 | + return true; |
| 173 | + } |
| 174 | + |
| 175 | + @Override |
| 176 | + public Object run() { |
| 177 | + RequestContext context = RequestContext.getCurrentContext(); |
| 178 | + BambooRequest.Builder builder = BambooRequest.builder() |
| 179 | + .serviceId((String)context.get(FilterConstants.SERVICE_ID_KEY)) |
| 180 | + .uri((String)context.get(FilterConstants.REQUEST_URI_KEY)) |
| 181 | + .ip(context.getZuulRequestHeaders().get(FilterConstants.X_FORWARDED_FOR_HEADER.toLowerCase())) |
| 182 | + .addMultiParams(context.getRequestQueryParams()) |
| 183 | + .addHeaders(context.getZuulRequestHeaders()) |
| 184 | + .addHeaders(context.getOriginResponseHeaders().stream().collect(Collectors.toMap(Pair::first, Pair::second))); |
| 185 | + context.getOriginResponseHeaders().forEach(pair-> builder.addHeader(pair.first(), pair.second())); |
| 186 | + |
| 187 | + ConnectPointContext connectPointContext = ConnectPointContext.builder().bambooRequest(builder.build()).build(); |
| 188 | + |
| 189 | + BambooAppContext.getBambooRibbonConnectionPoint().executeConnectPoint(connectPointContext); |
| 190 | + return null; |
| 191 | + } |
| 192 | + |
| 193 | +} |
| 194 | +``` |
| 195 | +```java |
| 196 | +/** |
| 197 | + * 做一些善后工作。比如删除BambooRequestContext在ThreadLocal中的信息。 |
| 198 | + */ |
| 199 | +public class BambooPostZuulFilter extends ZuulFilter { |
| 200 | + @Override |
| 201 | + public String filterType() { |
| 202 | + return FilterConstants.POST_TYPE; |
| 203 | + } |
| 204 | + |
| 205 | + @Override |
| 206 | + public int filterOrder() { |
| 207 | + return 0; |
| 208 | + } |
| 209 | + |
| 210 | + @Override |
| 211 | + public boolean shouldFilter() { |
| 212 | + return true; |
| 213 | + } |
| 214 | + |
| 215 | + @Override |
| 216 | + public Object run() { |
| 217 | +// BambooRequestContext.shutdownRequestContext(); |
| 218 | + BambooAppContext.getBambooRibbonConnectionPoint().shutdownconnectPoint(); |
| 219 | + return null; |
| 220 | + } |
| 221 | +} |
| 222 | +``` |
| 223 | + |
| 224 | +* Hystrix衔接 |
| 225 | +Hystrix实现降级、断路器等功能,但是在使用线程池隔离时,ThreadLocal存储的信息如何传递下去呢?使用HystrixRequestVariableDefault可以解决这个问题。可以查看com.netflix.hystrix.strategy.concurrency包下的HystrixContexSchedulerAction、HystrixContextCallable、HystrixContextRunnable,它们都有一段相同功能的代码 |
| 226 | +```java |
| 227 | +public class HystrixContextRunnable implements Runnable { |
| 228 | + |
| 229 | + private final Callable<Void> actual; |
| 230 | + private final HystrixRequestContext parentThreadState; |
| 231 | + |
| 232 | + //... |
| 233 | + |
| 234 | + @Override |
| 235 | + public void run() { |
| 236 | + HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread(); |
| 237 | + try { |
| 238 | + // set the state of this thread to that of its parent |
| 239 | + HystrixRequestContext.setContextOnCurrentThread(parentThreadState); |
| 240 | + // execute actual Callable with the state of the parent |
| 241 | + try { |
| 242 | + actual.call(); |
| 243 | + } catch (Exception e) { |
| 244 | + throw new RuntimeException(e); |
| 245 | + } |
| 246 | + } finally { |
| 247 | + // restore this thread back to its original state |
| 248 | + HystrixRequestContext.setContextOnCurrentThread(existingState); |
| 249 | + } |
| 250 | + } |
| 251 | + |
| 252 | +} |
| 253 | +``` |
| 254 | +parentThreadState也是一个HystrixRequestContext对象,它是在hystrix创建线程之前的,也就是处理http请求的线程的HystrixRequestContext对象,我们一般也是维护这个对象。在使用线程池隔离时,hystrix会将parentThreadState中的信息复到到新线程中,实现跨线程的数据传递,从而在后面的逻辑中可以获取到parentThreadState中维护的信息,包括ribbon的路由信息。在bamboo中,将一步骤的逻辑放到BambooRequestContext中,将BambooRequestContext实例本身传递下去。 |
| 255 | +```java |
| 256 | +public class BambooRequestContext { |
| 257 | + |
| 258 | + private static final Logger log = LoggerFactory.getLogger(BambooRequestContext.class); |
| 259 | + |
| 260 | + private static final HystrixRequestVariableDefault<BambooRequestContext> CURRENT_CONTEXT = new HystrixRequestVariableDefault<BambooRequestContext>(); |
| 261 | + |
| 262 | + |
| 263 | + private final String apiVersion; |
| 264 | + private final BambooRequest bambooRequest; |
| 265 | + private Map<String, Object> params; |
| 266 | + |
| 267 | + |
| 268 | + private BambooRequestContext(BambooRequest bambooRequest, String apiVersion) { |
| 269 | + params = new HashMap<>(); |
| 270 | + this.apiVersion = apiVersion; |
| 271 | + this.bambooRequest = bambooRequest; |
| 272 | + } |
| 273 | + |
| 274 | + |
| 275 | + public static BambooRequestContext currentRequestCentxt() { |
| 276 | + return CURRENT_CONTEXT.get(); |
| 277 | + } |
| 278 | + |
| 279 | + public static void initRequestContext(BambooRequest bambooRequest, String apiVersion) { |
| 280 | + if (!HystrixRequestContext.isCurrentThreadInitialized()) { |
| 281 | + HystrixRequestContext.initializeContext(); |
| 282 | + } |
| 283 | + CURRENT_CONTEXT.set(new BambooRequestContext(bambooRequest, apiVersion)); |
| 284 | + } |
| 285 | + |
| 286 | + public static void shutdownRequestContext() { |
| 287 | + if (HystrixRequestContext.isCurrentThreadInitialized()) { |
| 288 | + HystrixRequestContext.getContextForCurrentThread().shutdown(); |
| 289 | + } |
| 290 | + } |
| 291 | + |
| 292 | + //忽略setter/getter |
| 293 | +} |
| 294 | +``` |
| 295 | + |
| 296 | +* Ribbon 路由规则 |
| 297 | +Bamboo中的BambooZoneAvoidanceRule继承了ZoneAvoidanceRule,所以它会有ZvoidanceRule的一切特性,在此基础上,还加入了版本过滤的逻辑,这个逻辑主要是由BambooApiVersionPredicate实现。从BambooRequestContext中获取请求的接口的版本,如果有该没有获取到版本,就返回true;如果有获取到版本,就获取服务实例的metadata中的version信息,并进行匹配校验,返回结果。 |
| 298 | +```java |
| 299 | +public class BambooApiVersionPredicate extends AbstractServerPredicate { |
| 300 | + |
| 301 | + |
| 302 | + public BambooApiVersionPredicate(BambooZoneAvoidanceRule rule) { |
| 303 | + super(rule); |
| 304 | + } |
| 305 | + |
| 306 | + @Override |
| 307 | + public boolean apply(PredicateKey input) { |
| 308 | + BambooLoadBalancerKey loadBalancerKey = getBambooLoadBalancerKey(input); |
| 309 | + if (loadBalancerKey != null && !StringUtils.isEmpty(loadBalancerKey.getApiVersion())) { |
| 310 | + Map<String, String> serverMetadata = ((BambooZoneAvoidanceRule) this.rule) |
| 311 | + .getServerMetadata(loadBalancerKey.getServiceId(), input.getServer()); |
| 312 | + String versions = serverMetadata.get("versions"); |
| 313 | + return matchVersion(versions, loadBalancerKey.getApiVersion()); |
| 314 | + } |
| 315 | + return true; |
| 316 | + } |
| 317 | + |
| 318 | + private BambooLoadBalancerKey getBambooLoadBalancerKey(PredicateKey input) { |
| 319 | + if(BambooRequestContext.currentRequestCentxt()!=null){ |
| 320 | + BambooRequestContext bambooRequestContext = BambooRequestContext.currentRequestCentxt(); |
| 321 | + String apiVersion = bambooRequestContext.getApiVersion(); |
| 322 | + if(!StringUtils.isEmpty(apiVersion)){ |
| 323 | + return BambooLoadBalancerKey.builder().apiVersion(apiVersion) |
| 324 | + .serviceId(bambooRequestContext.getServiceId()).build(); |
| 325 | + } |
| 326 | + } |
| 327 | + return null; |
| 328 | + } |
| 329 | + //... |
| 330 | +} |
| 331 | +``` |
| 332 | + |
| 333 | +##### 使用说明 |
| 334 | +多版本控制 --> [spring-cloud-mult-version-samples](../spring-cloud-mult-version-samples/README.md) |
0 commit comments