前提
在微服务架构中,下游依赖出现问题如果上游调用方不做请求降级处理,下游的异常依赖没有被隔离,很有可能出现因为一两个服务或者小到一两个接口异常导致上游所有服务不可用,甚至影响整个业务线。请求降级处理目前比较主流的依然是Netfilx
出品的Hystrix
。Hystrix
的工作原理是:
- 把请求基于线程池或者信号量隔离,一旦下游服务在指定配置的超时时间内无法响应会进入预设或者默认的降级实现。
- 每个请求的状态都会记录下来,在一个滑动窗口内处理失败的比率超过设定的阈值就会触发熔断器(Circle Breaker)开启,熔断器开启之后所有请求都会直接进入预设或者默认的降级逻辑。
- 熔断器打开后,且距离熔断器打开的时间或上一次试探请求放行的时间超过设定值,熔断器器进入半开状态,允许放行一个试探请求。
- 请求成功率提高后,基于统计数据确定对熔断器进行关闭,所有请求正常放行。
这里不对Hystrix
的细节做更深入分析,而是接着谈谈Spring Cloud Gateway
中如何使用Hystrix
,主要包括内置的Hystrix
过滤器和定制过滤器结合Hystrix
实现我们想要的功能。除了要引入spring-cloud-starter-gateway
依赖之外,还需要引入spring-cloud-starter-netflix-hystrix
。
<dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-gateway</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix</artifactId> </dependency> </dependencies>
|
使用内置的Hystrix过滤器
内置的Hystrix
过滤器是HystrixGatewayFilterFactory
,它支持的配置是:
public static class Config { private String name; private Setter setter; private URI fallbackUri;
public String getName() { return name; }
public Config setName(String name) { this.name = name; return this; }
public Config setFallbackUri(String fallbackUri) { if (fallbackUri != null) { setFallbackUri(URI.create(fallbackUri)); } return this; }
public URI getFallbackUri() { return fallbackUri; } public void setFallbackUri(URI fallbackUri) { if (fallbackUri != null && !"forward".equals(fallbackUri.getScheme())) { throw new IllegalArgumentException("Hystrix Filter currently only supports 'forward' URIs, found " + fallbackUri); } this.fallbackUri = fallbackUri; }
public Config setSetter(Setter setter) { this.setter = setter; return this; } }
|
另外,(1)全局的Hystrix
配置也会对HystrixGatewayFilterFactory
生效;(2)HystrixGatewayFilterFactory
可以作为默认过滤器(default-filters)对所有的路由配置作为兜底过滤器并发挥作用。
对于第(1)点,我们如果在application.yaml
中配置如下:
// 执行超时时间为1秒,会对下面路由order_route绑定的HystrixGatewayFilterFactory生效 hystrix.command.fallbackcmd.execution.isolation.thread.timeoutInMilliseconds: 1000
spring: cloud: gateway: routes: - id: order_route uri: http://localhost:9091 predicates: - Path=/order/** filters: - name: Hystrix args: name: HystrixCommand fallbackUri: forward:/fallback
|
配置的hystrix.command.fallbackcmd.execution.isolation.thread.timeoutInMilliseconds
会对绑定在路由order_route中的HystrixGatewayFilterFactory
生效。
对于第(2)点,我们可以把HystrixGatewayFilterFactory
配置为默认过滤器,这样子所有的路由都会关联此过滤器,但是非必要时建议不要这样做:
spring: cloud: gateway: routes: - id: order_route uri: http://localhost:9091 predicates: - Path=/order/** default-filters: - name: Hystrix args: name: HystrixCommand fallbackUri: forward:/fallback
|
笔者在测试的时候,发现上面提到的Setter
无法配置,估计是由于Hystrix
的Setter
对象是经过多重包装,暂时没有办法设置该属性。接着我们要在网关服务加一个控制器方法用于处理重定向的/fallback
请求:
@RestController public class FallbackController {
@RequestMapping(value = "/fallback") @ResponseStatus public Mono<Map<String, Object>> fallback(ServerWebExchange exchange, Throwable throwable) { Map<String, Object> result = new HashMap<>(8); ServerHttpRequest request = exchange.getRequest(); result.put("path", request.getPath().pathWithinApplication().value()); result.put("method", request.getMethodValue()); if (null != throwable.getCause()) { result.put("message", throwable.getCause().getMessage()); } else { result.put("message", throwable.getMessage()); } return Mono.just(result); } }
|
控制器方法入参会被Spring Cloud Gateway
的内部组件处理,可以回调一些有用的类型例如ServerWebExchange
实例、具体的异常实例等等。
使用Hystrix定制过滤器
HystrixGatewayFilterFactory
在大多数情况下应该可以满足业务需要,但是这里也做一次定制一个整合Hystrix
的过滤器,实现的功能如下:
- 基于每个请求URL创建一个新的
Hystrix
命令实例进行调用。
- 每个URL可以指定特有的线程池配置,如果不指定则使用默认的。
- 每个URL可以配置单独的
Hystrix
超时时间。
也就是通过Hystrix
使用线程池对每种不同的外部请求URL进行隔离。当然,这样的过滤器仅仅在外部请求的不同URL的数量有限的情况下才比较合理,否则有可能创建过多的线程池造成系统性能的下降,适得其反。改造如下:
@Component public class CustomHystrixFilter extends AbstractGatewayFilterFactory<CustomHystrixFilter.Config> {
private static final String FORWARD_KEY = "forward"; private static final String NAME = "CustomHystrix"; private static final int TIMEOUT_MS = 1000; private final ObjectProvider<DispatcherHandler> dispatcherHandlerProvider; private volatile DispatcherHandler dispatcherHandler; private boolean processConfig = false;
public CustomHystrixFilter(ObjectProvider<DispatcherHandler> dispatcherHandlerProvider) { super(Config.class); this.dispatcherHandlerProvider = dispatcherHandlerProvider; }
private DispatcherHandler getDispatcherHandler() { if (dispatcherHandler == null) { dispatcherHandler = dispatcherHandlerProvider.getIfAvailable(); }
return dispatcherHandler; }
@Override public List<String> shortcutFieldOrder() { return Collections.singletonList(NAME_KEY); }
@Override public GatewayFilter apply(Config config) { processConfig(config); return (exchange, chain) -> { ServerHttpRequest request = exchange.getRequest(); String path = request.getPath().pathWithinApplication().value(); int timeout = config.getTimeout().getOrDefault(path, TIMEOUT_MS); CustomHystrixCommand command = new CustomHystrixCommand(config.getFallbackUri(), exchange, chain, timeout, path); return Mono.create(s -> { Subscription sub = command.toObservable().subscribe(s::success, s::error, s::success); s.onCancel(sub::unsubscribe); }).onErrorResume((Function<Throwable, Mono<Void>>) throwable -> { if (throwable instanceof HystrixRuntimeException) { HystrixRuntimeException e = (HystrixRuntimeException) throwable; HystrixRuntimeException.FailureType failureType = e.getFailureType(); switch (failureType) { case TIMEOUT: return Mono.error(new TimeoutException()); case COMMAND_EXCEPTION: { Throwable cause = e.getCause(); if (cause instanceof ResponseStatusException || AnnotatedElementUtils .findMergedAnnotation(cause.getClass(), ResponseStatus.class) != null) { return Mono.error(cause); } } default: break; } } return Mono.error(throwable); }).then(); }; }
private void processConfig(Config config) { if (!processConfig) { processConfig = true; if (null != config.getTimeout()) { Map<String, Integer> timeout = new HashMap<>(8); config.getTimeout().forEach((k, v) -> { String key = k.replace("-", "/"); if (!key.startsWith("/")) { key = "/" + key; } timeout.put(key, v); }); config.setTimeout(timeout); } } }
@Override public String name() { return NAME; }
private class CustomHystrixCommand extends HystrixObservableCommand<Void> {
private final URI fallbackUri; private final ServerWebExchange exchange; private final GatewayFilterChain chain;
public CustomHystrixCommand(URI fallbackUri, ServerWebExchange exchange, GatewayFilterChain chain, int timeout, String key) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(key)) .andCommandKey(HystrixCommandKey.Factory.asKey(key)) .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(timeout))); this.fallbackUri = fallbackUri; this.exchange = exchange; this.chain = chain; }
@Override protected Observable<Void> construct() { return RxReactiveStreams.toObservable(this.chain.filter(exchange)); }
@Override protected Observable<Void> resumeWithFallback() { if (null == fallbackUri) { return super.resumeWithFallback(); } URI uri = exchange.getRequest().getURI(); boolean encoded = containsEncodedParts(uri); URI requestUrl = UriComponentsBuilder.fromUri(uri) .host(null) .port(null) .uri(this.fallbackUri) .build(encoded) .toUri(); exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl); ServerHttpRequest request = this.exchange.getRequest().mutate().uri(requestUrl).build(); ServerWebExchange mutated = exchange.mutate().request(request).build(); return RxReactiveStreams.toObservable(getDispatcherHandler().handle(mutated)); } }
public static class Config {
private String id; private URI fallbackUri;
private Map<String, Integer> timeout;
public String getId() { return id; }
public Config setId(String id) { this.id = id; return this; }
public URI getFallbackUri() { return fallbackUri; }
public Config setFallbackUri(URI fallbackUri) { if (fallbackUri != null && !FORWARD_KEY.equals(fallbackUri.getScheme())) { throw new IllegalArgumentException("Hystrix Filter currently only supports 'forward' URIs, found " + fallbackUri); } this.fallbackUri = fallbackUri; return this; }
public Map<String, Integer> getTimeout() { return timeout; }
public Config setTimeout(Map<String, Integer> timeout) { this.timeout = timeout; return this; } } }
|
其实大部分代码和内置的Hystrix
过滤器差不多,只是改了命令改造函数部分和配置加载处理的部分。配置文件如下:
spring: cloud: gateway: routes: - id: hystrix_route uri: http://localhost:9091 predicates: - Host=localhost:9090 filters: - name: CustomHystrix args: id: CustomHystrix fallbackUri: forward:/fallback timeout: order-remote: 2000 application: name: route-server server: port: 9090
|
网关添加一个/fallback
处理控制器如下:
@RestController public class FallbackController {
@RequestMapping(value = "/fallback") @ResponseStatus public Mono<Map<String, Object>> fallback(ServerWebExchange exchange, Throwable throwable) { Map<String, Object> result = new HashMap<>(8); ServerHttpRequest request = exchange.getRequest(); result.put("path", request.getPath().pathWithinApplication().value()); result.put("method", request.getMethodValue()); if (null != throwable.getCause()) { result.put("message", throwable.getCause().getMessage()); } else { result.put("message", throwable.getMessage()); } return Mono.just(result); } }
|
故意在下游服务打断点:
curl http://localhost:9090/order/remote
响应结果: { "path": "/fallback", "method": "GET", "message": null }
|
刚好符合预期结果。
小结
这篇文章仅仅是对Hystrix
和过滤器应用提供一个可用的例子和解决问题的思路,具体如何使用还是需要针对真实的场景。
(本文完 c-2-d e-a-20190522)