前提 笔者之前在查找Sentinel 相关资料的时候,偶然中找到了Martin Fowler
大神的一篇文章《CircuitBreaker》 。于是花了点时间仔细阅读,顺便温习一下断路器CircuitBreaker
的原理与实现。
CircuitBreaker的原理 现实生活中的熔断器(更多时候被称为保险丝 )是一种安装在电路中用于保证电路安全运行的电子元件。它的外形一般是一个绝缘的玻璃容器包裹着一段固定大小电阻和固定熔点的纤细合金导体,如下图:
电路中,保险丝会和其他用电的原件串联,根据物理公式Q = I^2*R*T
(Q
为热能值,也理解为保险丝熔断的极限热能值,I
为电流中的电流,R
为保险丝固定电阻,T
为时间),如果电路中有其他用电的原件短路,会导致电流I
十分大,导致在T
很小的情况下,计算得到的Q
远大于保险丝熔断的极限热能值,保险丝就会被击穿熔断。这个时候整个电路处于断开状态,从而避免电路过载导致电路中的用电原件损毁。
电路中的电流过大会导致所有电阻比较大的电器发生大量积热,很容易出现火灾,所以保险丝在过去曾经起到巨大的作用。后来出现了更加先进的”空气开关”,漏电开关多数都升级为此实现,保险丝依然会应用在各种的原件中,但是几乎淡出了日常生活触及的视线范围。
记得小时候某个傍晚爷爷拉开了白炽灯,啪的一声整个屋子的电器都停了,突然停电了。他说了句:保险丝”烧”了,等我换一条。换上保险丝把总闸门打上去后供电恢复。
从上面的分析可见:现实中的熔断器是一次性使用的消耗品 ,而且有些场景下需要人为干预(更换)。
软件系统中的CircuitBreaker
在设计上是借鉴了现实生活中熔断器的功能并且做出改良而诞生的一种模式。这个模式出现的背景是:随着软件和计算机网络的发展,以及当前微服务架构的普及 ,应用会部署在不同的计算机上或者同一台计算机的不同进程上,那么需要通过远程调用进行交互。远程调用和单进程的内存态调用的最大区别之一是:远程调用有可能因为各种原因出现调用失败、没有响应的挂起(其实就是无超时期限的等待)或者直到某个超时的期限才返回结果。这些故障会导致调用方的资源被一直占用无法释放(最常见的就是调用方接收请求或者处理请求的线程被长时间挂起):
如果发生故障的被调用方节点刚好是关键节点,并且此故障节点的上游调用者比较多(例如上图中的内部网关),那么级联故障会蔓延,极端情况下甚至会耗尽了整个服务集群中的所有资源。如果在服务的远程调用加入了CircuitBreaker
组件,那么单个服务调用的效果如下:
断路器CircuitBreaker
的基本原理比较简单:将受保护的函数(方法)包装在断路器对象中进行调用,断路器对象将会监视所有的调用相关的数据(主要是统计维度的数据,一般方法参数可以过滤)。一旦出现故障的调用达到了某个阈值或者触发了某些规则,断路器就会切换为Open
状态,所有经由断路器的调用都会快速失败,请求不会到达下游被调用方。笔者认为从实际来看,CircuitBreaker
的核心功能就是三大块:
调用数据度量统计。
维护断路器自身的状态。
基于前两点保护包裹在断路器中执行的调用。
基于调用数据的度量统计一般会引入JDK8
中的原子(Atomic
)类型。下游被调用方不会一直处于故障,为了断路器能够自恢复,引入了Half_Open
状态和滑动窗口的概念。同时,考虑到程序容器的线程阻塞带来的毁灭性影响,有时候可以考虑进行如下优化:断路器把受保护的调用基于定义好的资源标识选择特定的线程池(或者信号量)进行调用,充分利用FutureTask#get(long timeout, TimeUnit unit)
设定调用任务最大超时期限的优势,这样就能基本避免出现故障的远程调用阻塞了本应用容器的线程。
这里的容器是特指Tomcat、Netty、Jetty等。而这里提到的线程池隔离、滑动窗口等概念会在下文具体实现的时候再详细展开。
基于线程池隔离:
直接基于容器线程隔离:
CircuitBreaker的简易实现
这一小节会按照上一小节的理论,设计多种CircuitBreaker
的实现,由简单到复杂一步一步进行迭代。CircuitBreaker
的状态转换设计图如下:
基于此设计图,Martin Fowler
大神在其文章中也给予了伪代码如下:
class ResetCircuitBreaker... // 初始化 def initialize &block @circuit = block @invocation_timeout = 0.01 @failure_threshold = 5 @monitor = BreakerMonitor.new @reset_timeout = 0.1 reset end // 重置 def reset @failure_count = 0 @last_failure_time = nil @monitor.alert :reset_circuit end // 状态维护 def state case when (@failure_count >= @failure_threshold) && (Time.now - @last_failure_time) > @reset_timeout :half_open when (@failure_count >= @failure_threshold) :open else :closed end end // 调用 def call args case state when :closed, :half_open begin do_call args // 这里从描述来看应该是漏了调用reset方法 // reset rescue Timeout::Error record_failure raise $! end when :open raise CircuitBreaker::Open else raise "Unreachable" end end // 记录失败 def record_failure @failure_count += 1 @last_failure_time = Time.now @monitor.alert(:open_circuit) if :open == state end
下面的多种实现的思路都是基于此伪代码的基本框架进行编写。
基于异常阈值不会自恢复的实现 这种实现最简单,也就是只需要维护状态Closed
转向Open
的临界条件即可,可以设定一个异常计数的阈值,然后使用一个原子计数器统计异常数量即可,Java
代码实现如下:
public enum CircuitBreakerStatus { CLOSED, OPEN, HALF_OPEN } @Getter public class SimpleCircuitBreaker { private final long failureThreshold; private final LongAdder failureCounter; private final LongAdder callCounter; private CircuitBreakerStatus status; public SimpleCircuitBreaker (long failureThreshold) { this .failureThreshold = failureThreshold; this .callCounter = new LongAdder(); this .failureCounter = new LongAdder(); this .status = CircuitBreakerStatus.CLOSED; } private final Object fallback = null ; @SuppressWarnings("unchecked") public <T> T call (Supplier<T> supplier) { try { if (CircuitBreakerStatus.CLOSED == this .status) { return supplier.get(); } } catch (Exception e) { this .failureCounter.increment(); tryChangingStatus(); } finally { this .callCounter.increment(); } return (T) fallback; } private void tryChangingStatus () { if (this .failureThreshold <= this .failureCounter.sum()) { this .status = CircuitBreakerStatus.OPEN; System.out.println(String.format("SimpleCircuitBreaker状态转换,[%s]->[%s]" , CircuitBreakerStatus.CLOSED, CircuitBreakerStatus.OPEN)); } } public void call (Runnable runnable) { call(() -> { runnable.run(); return null ; }); } }
在多线程调用的前提下,如果在很短时间内有大量的线程中的方法调用出现异常,有可能所有调用都会涌进去tryChangingStatus()
方法,这种情况下会导致CircuitBreaker
的状态被并发修改,可以考虑使用AtomicReference
包裹CircuitBreakerStatus
,做CAS
更新(确保只更新一次)即可。变更的代码如下:
private final AtomicReference<CircuitBreakerStatus> status;public SimpleCircuitBreaker (long failureThreshold) { ...... this .status = new AtomicReference<>(CircuitBreakerStatus.CLOSED); } public <T> T call (Supplier<T> supplier) { try { if (CircuitBreakerStatus.CLOSED == this .status.get()) { return supplier.get(); } ...... private void tryChangingStatus () { if (this .failureThreshold <= this .failureCounter.sum()) { boolean b = this .status.compareAndSet(CircuitBreakerStatus.CLOSED, CircuitBreakerStatus.OPEN); if (b) { System.out.println(String.format("SimpleCircuitBreaker状态转换,[%s]->[%s]" , CircuitBreakerStatus.CLOSED, CircuitBreakerStatus.OPEN)); } } }
并发极高的场景下假设出现调用异常前提下,异常计数器failureCounter
的计数值有可能在一瞬间就远超过了异常阈值failureCounter
,但是一般不考虑对这些计数值的比较或者状态切换的准确时机添加同步机制(例如加锁),因为一旦加入同步机制会大大降低并发性能,这样引入断路器反而成为了性能隐患,显然是不合理的。所以一般设计断路器逻辑的时候,并不需要控制断路器状态切换的具体计数值临界点,保证状态一定切换正常即可。基于此简陋断路器编写一个同步调用的测试例子:
public static class Service { public String process (int i) { System.out.println("进入process方法,number:" + i); throw new RuntimeException(String.valueOf(i)); } } public static void main (String[] args) throws Exception { SimpleCircuitBreaker circuitBreaker = new SimpleCircuitBreaker(5L ); Service service = new Service(); for (int i = 0 ; i < 10 ; i++) { int temp = i; String result = circuitBreaker.call(() -> service.process(temp)); System.out.println(String.format("返回结果:%s,number:%d" , result, temp)); } }
测试结果输出如下:
进入process方法,number:0 返回结果:null,number:0 进入process方法,number:1 返回结果:null,number:1 进入process方法,number:2 返回结果:null,number:2 进入process方法,number:3 返回结果:null,number:3 进入process方法,number:4 SimpleCircuitBreaker状态转换,[CLOSED]->[OPEN] 返回结果:null,number:4 返回结果:null,number:5 返回结果:null,number:6 返回结果:null,number:7 返回结果:null,number:8 返回结果:null,number:9
细心的伙伴会发现,基本上状态的维护和变更和数据统计都位于调用异常或者失败的方法入口以及最后的finally代码块,在真实的调用逻辑前一般只会做状态判断或者下文提到的分配调用资源等。
基于异常阈值并且能够自恢复的实现 基于异常阈值、能够自恢复的CircuitBreaker
实现需要引入Half_Open
状态,同时需要记录最后一次失败调用的时间戳以及reset_timeout
(断路器的当前的系统时间戳减去上一阶段最后一次失败调用的时间差,大于某个值的时候,并且当前的失败调用大于失败阈值则需要把状态重置为Half_Open
,这里的”某个值”定义为reset_timeout
),示意图如下:
假设当前的调用为圆形6
,当前系统时间戳减去(上一轮)最后一个失败调用(圆形5
)的时间戳大于预设的reset_timeout
的时候,不论当次调用是成功还是失败,直到下一次调用失败或者失败调用数降低到转换为Closed
状态之前,都处于Half_Open
状态,会对单个调用进行放行(并发场景下也有可能同时放行多个调用)。代码实现如下:
public enum CircuitBreakerStatusMonitor { X; public void report (String name, CircuitBreakerStatus o, CircuitBreakerStatus n) { System.out.println(String.format("断路器[%s]状态变更,[%s]->[%s]" , name, o, n)); } public void reset (String name) { System.out.println(String.format("断路器[%s]重置" , name)); } } @Getter public class RestCircuitBreaker { private final long failureThreshold; private final long resetTimeout; private LongAdder failureCounter; private LongAdder callCounter; private AtomicReference<CircuitBreakerStatus> status; private final Object fallback = null ; private long lastFailureTime; public RestCircuitBreaker (long failureThreshold, long resetTimeout) { this .failureThreshold = failureThreshold; this .resetTimeout = resetTimeout; reset(); } public void reset () { CircuitBreakerStatusMonitor.X.reset("RestCircuitBreaker" ); this .callCounter = new LongAdder(); this .failureCounter = new LongAdder(); this .status = new AtomicReference<>(CircuitBreakerStatus.CLOSED); this .lastFailureTime = -1L ; } @SuppressWarnings("unchecked") public <T> T call (Supplier<T> supplier) { try { if (shouldAllowExecution()) { T result = supplier.get(); markSuccess(); return result; } } catch (Exception e) { markNoneSuccess(); } finally { this .callCounter.increment(); } return (T) fallback; } public void call (Runnable runnable) { call(() -> { runnable.run(); return null ; }); } boolean shouldAllowExecution () { if (lastFailureTime == -1L ) { return true ; } if (failureThreshold > failureCounter.sum()) { return true ; } return shouldTryAfterRestTimeoutWindow() && changeStatus(CircuitBreakerStatus.OPEN, CircuitBreakerStatus.HALF_OPEN); } boolean changeStatus (CircuitBreakerStatus o, CircuitBreakerStatus n) { boolean r = status.compareAndSet(o, n); if (r) { CircuitBreakerStatusMonitor.X.report("RestCircuitBreaker" , o, n); } return r; } boolean shouldTryAfterRestTimeoutWindow () { long lastFailureTimeSnap = lastFailureTime; long currentTime = System.currentTimeMillis(); return currentTime > lastFailureTimeSnap + resetTimeout; } public void markSuccess () { if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.CLOSED)) { reset(); } } public void markNoneSuccess () { this .failureCounter.increment(); if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.OPEN)) { this .lastFailureTime = System.currentTimeMillis(); } if (this .failureCounter.sum() >= failureThreshold && changeStatus(CircuitBreakerStatus.CLOSED, CircuitBreakerStatus.OPEN)) { this .lastFailureTime = System.currentTimeMillis(); } } }
编写一个测试客户端RestCircuitBreakerClient
:
public class RestCircuitBreakerClient { public static void main (String[] args) throws Exception { Service service = new Service(); RestCircuitBreaker cb = new RestCircuitBreaker(5 , 500 ); for (int i = 0 ; i < 10 ; i++) { int temp = i; String result = cb.call(() -> service.process(temp)); System.out.println(String.format("返回结果:%s,number:%d" , result, temp)); } Thread.sleep(501L ); cb.call(service::processSuccess); for (int i = 0 ; i < 3 ; i++) { int temp = i; String result = cb.call(() -> service.process(temp)); System.out.println(String.format("返回结果:%s,number:%d" , result, temp)); } } public static class Service { public String process (int i) { System.out.println("进入process方法,number:" + i); throw new RuntimeException(String.valueOf(i)); } public void processSuccess () { System.out.println("调用processSuccess方法" ); } } }
输出结果如下:
断路器[RestCircuitBreaker]重置 进入process方法,number:0 返回结果:null,number:0 进入process方法,number:1 返回结果:null,number:1 进入process方法,number:2 返回结果:null,number:2 进入process方法,number:3 返回结果:null,number:3 进入process方法,number:4 断路器[RestCircuitBreaker]状态变更,[CLOSED]->[OPEN] 返回结果:null,number:4 返回结果:null,number:5 返回结果:null,number:6 返回结果:null,number:7 返回结果:null,number:8 返回结果:null,number:9 断路器[RestCircuitBreaker]状态变更,[OPEN]->[HALF_OPEN] 调用processSuccess方法 # <------ 这个位置的成功调用重置了断路器的状态 断路器[RestCircuitBreaker]状态变更,[HALF_OPEN]->[CLOSED] 断路器[RestCircuitBreaker]重置 进入process方法,number:0 返回结果:null,number:0 进入process方法,number:1 返回结果:null,number:1 进入process方法,number:2 返回结果:null,number:2
基于线程池隔离和超时控制 在使用CircuitBreaker
的时候,可以基于不同的资源(唯一标识可以使用resource_key
或者resource_name
)创建单独的线程池,让资源基于线程池进行隔离调用。这种设计的原则借鉴于运货船的船舱设计,每个船舱都使用绝缘的材料进行分隔,一旦某个船舱出现了火情,也不会蔓延到其他船舱。在Java
体系中,可以使用线程池ThreadPoolExecutor#submit(Callable<T> task)
进行指定超时上限限制的任务提交和结果获取,这样就可以预设一个调用超时时间上限,限制每个调用的可用的最大调用时间。
首先需要设计一个轻量级的资源线程池管理模块:
@Data public class CircuitBreakerResourceConf { private String resourceName; private int coreSize; private int queueSize; private long timeout; } public enum CircuitBreakerResourceManager { X; public final Map<String, CircuitBreakerResource> cache = new ConcurrentHashMap<>(8 ); public void register (CircuitBreakerResourceConf conf) { cache.computeIfAbsent(conf.getResourceName(), rn -> { int coreSize = conf.getCoreSize(); int queueSize = conf.getQueueSize(); BlockingQueue<Runnable> queue; if (queueSize > 0 ) { queue = new ArrayBlockingQueue<>(queueSize); } else { queue = new SynchronousQueue<>(); } ThreadPoolExecutor executor = new ThreadPoolExecutor( coreSize, coreSize, 0 , TimeUnit.SECONDS, queue, new ThreadFactory() { private final AtomicInteger counter = new AtomicInteger(); @Override public Thread newThread (Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true ); thread.setName(rn + "-CircuitBreakerWorker-" + counter.getAndIncrement()); return thread; } }, new ThreadPoolExecutor.AbortPolicy() ); CircuitBreakerResource resource = new CircuitBreakerResource(); resource.setExecutor(executor); resource.setTimeout(conf.getTimeout()); return resource; }); } public CircuitBreakerResource get (String resourceName) { return Optional.ofNullable(cache.get(resourceName)).orElseThrow(() -> new IllegalArgumentException(resourceName)); } }
编写断路器ResourceCircuitBreaker
的实现代码:
@Getter public class ResourceCircuitBreaker { private final long failureThreshold; private final long resetTimeout; private LongAdder failureCounter; private LongAdder callCounter; private AtomicReference<CircuitBreakerStatus> status; private final ThreadPoolExecutor executor; private final Object fallback = null ; private final String circuitBreakerName; private long lastFailureTime; private final long executionTimeout; public ResourceCircuitBreaker (String resourceName, long failureThreshold, long resetTimeout) { CircuitBreakerResource resource = CircuitBreakerResourceManager.X.get(resourceName); this .circuitBreakerName = "ResourceCircuitBreaker-" + resourceName; this .executor = resource.getExecutor(); this .executionTimeout = resource.getTimeout(); this .failureThreshold = failureThreshold; this .resetTimeout = resetTimeout; reset(); } public void reset () { CircuitBreakerStatusMonitor.X.reset(this .circuitBreakerName); this .callCounter = new LongAdder(); this .failureCounter = new LongAdder(); this .status = new AtomicReference<>(CircuitBreakerStatus.CLOSED); this .lastFailureTime = -1L ; } @SuppressWarnings("unchecked") public <T> T call (Supplier<T> supplier) { try { if (shouldAllowExecution()) { Future<T> future = this .executor.submit(warp(supplier)); T result = future.get(executionTimeout, TimeUnit.MILLISECONDS); markSuccess(); return result; } } catch (Exception e) { markNoneSuccess(); } finally { this .callCounter.increment(); } return (T) fallback; } <T> Callable<T> warp (Supplier<T> supplier) { return supplier::get; } public void call (Runnable runnable) { call(() -> { runnable.run(); return null ; }); } boolean shouldAllowExecution () { if (lastFailureTime == -1L ) { return true ; } if (failureThreshold > failureCounter.sum()) { return true ; } return shouldTryAfterRestTimeoutWindow() && changeStatus(CircuitBreakerStatus.OPEN, CircuitBreakerStatus.HALF_OPEN); } boolean changeStatus (CircuitBreakerStatus o, CircuitBreakerStatus n) { boolean r = status.compareAndSet(o, n); if (r) { CircuitBreakerStatusMonitor.X.report(this .circuitBreakerName, o, n); } return r; } boolean shouldTryAfterRestTimeoutWindow () { long lastFailureTimeSnap = lastFailureTime; long currentTime = System.currentTimeMillis(); return currentTime > lastFailureTimeSnap + resetTimeout; } public void markSuccess () { if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.CLOSED)) { reset(); } } public void markNoneSuccess () { this .failureCounter.increment(); if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.OPEN)) { this .lastFailureTime = System.currentTimeMillis(); } if (this .failureCounter.sum() >= failureThreshold && changeStatus(CircuitBreakerStatus.CLOSED, CircuitBreakerStatus.OPEN)) { this .lastFailureTime = System.currentTimeMillis(); } } }
编写测试场景类ResourceCircuitBreakerClient
:
public class ResourceCircuitBreakerClient { public static void main (String[] args) throws Exception { CircuitBreakerResourceConf conf = new CircuitBreakerResourceConf(); conf.setCoreSize(10 ); conf.setQueueSize(0 ); conf.setResourceName("SERVICE" ); conf.setTimeout(50 ); CircuitBreakerResourceManager.X.register(conf); Service service = new Service(); ResourceCircuitBreaker cb = new ResourceCircuitBreaker("SERVICE" , 5 , 500 ); for (int i = 0 ; i < 10 ; i++) { int temp = i; String result = cb.call(() -> service.process(temp)); System.out.println(String.format("返回结果:%s,number:%d" , result, temp)); } Thread.sleep(501L ); cb.call(service::processSuccess); for (int i = 0 ; i < 3 ; i++) { int temp = i; String result = cb.call(() -> service.process(temp)); System.out.println(String.format("返回结果:%s,number:%d" , result, temp)); } } public static class Service { private final Random r = new Random(); public String process (int i) { int sleep = r.nextInt(200 ); System.out.println(String.format("线程[%s]-进入process方法,number:%d,休眠%d毫秒" , Thread.currentThread().getName(), i, sleep)); try { Thread.sleep(sleep); } catch (InterruptedException ignore) { } return String.valueOf(i); } public void processSuccess () { System.out.println(String.format("线程[%s]-调用processSuccess方法" , Thread.currentThread().getName())); } } }
某次执行的输出结果如下:
断路器[ResourceCircuitBreaker-SERVICE]重置 线程[SERVICE-CircuitBreakerWorker-0]-进入process方法,number:0,休眠67毫秒 返回结果:null,number:0 线程[SERVICE-CircuitBreakerWorker-1]-进入process方法,number:1,休眠85毫秒 返回结果:null,number:1 线程[SERVICE-CircuitBreakerWorker-2]-进入process方法,number:2,休眠72毫秒 返回结果:null,number:2 线程[SERVICE-CircuitBreakerWorker-3]-进入process方法,number:3,休眠88毫秒 返回结果:null,number:3 线程[SERVICE-CircuitBreakerWorker-4]-进入process方法,number:4,休眠28毫秒 返回结果:4,number:4 线程[SERVICE-CircuitBreakerWorker-5]-进入process方法,number:5,休眠102毫秒 断路器[ResourceCircuitBreaker-SERVICE]状态变更,[CLOSED]->[OPEN] 返回结果:null,number:5 返回结果:null,number:6 返回结果:null,number:7 返回结果:null,number:8 返回结果:null,number:9 断路器[ResourceCircuitBreaker-SERVICE]状态变更,[OPEN]->[HALF_OPEN] 线程[SERVICE-CircuitBreakerWorker-6]-调用processSuccess方法 断路器[ResourceCircuitBreaker-SERVICE]状态变更,[HALF_OPEN]->[CLOSED] 断路器[ResourceCircuitBreaker-SERVICE]重置 线程[SERVICE-CircuitBreakerWorker-7]-进入process方法,number:0,休眠74毫秒 返回结果:null,number:0 线程[SERVICE-CircuitBreakerWorker-8]-进入process方法,number:1,休眠111毫秒 返回结果:null,number:1 线程[SERVICE-CircuitBreakerWorker-9]-进入process方法,number:2,休眠183毫秒 返回结果:null,number:2
滑动窗口和百分比统计 上一个小节已经实现了资源基于线程池隔离进行调用,但是有一点明显的不足就是:断路器的状态管理和重置并不符合生产场景,HALF_OPEN -> CLOSED
的状态切换和重置不应该在放行单个调用成功之后立刻触发,而应该建立在一定时间范围内,调用的(平均)失败率下降到某个阈值或者调用的(平均)成功率恢复到某个阈值,否则很多场景下会导致断路器的状态频繁发生切换,功能基本处于失效的状态。也就是大多数场景下,一段时间内的failurePercent
会比异常计数和failureThreshold
的直接对比更加准确。可以引入滑动窗口(Sliding Window
)的概念,记录每个时间单元内的调用总次数、调用成功次数、调用超时次数和非超时的调用失败次数,为了简化操作这个时间单元定义为1
秒:
定义一个用于记录这四种调用次数的桶Bucket
类(这里的实现稍微跟上图有点不同,非超时失败修改为线程池拒绝的任务统计,而失败统计包括了任务超时执行和一般的业务异常):
@RequiredArgsConstructor @Getter public class MetricInfo { private final long total; private final long success; private final long failure; private final long reject; public static final MetricInfo EMPTY = new MetricInfo(0 , 0 , 0 , 0 ); public MetricInfo merge (MetricInfo other) { return new MetricInfo( this .total + other.getTotal(), this .success + other.getSuccess(), this .failure + other.getFailure(), this .reject + other.getReject() ); } } public class Bucket { @Getter private final long windowStartTimestamp; private final LongAdder total; private final LongAdder success; private final LongAdder failure; private final LongAdder reject; public Bucket (long windowStartTimestamp) { this .windowStartTimestamp = windowStartTimestamp; this .total = new LongAdder(); this .success = new LongAdder(); this .reject = new LongAdder(); this .failure = new LongAdder(); } public void increaseTotal () { this .total.increment(); } public void increaseSuccess () { this .success.increment(); } public void increaseFailure () { this .failure.increment(); } public void increaseReject () { this .reject.increment(); } public long totalCount () { return this .total.sum(); } public long successCount () { return this .success.sum(); } public long failureCount () { return this .failure.sum(); } public long rejectCount () { return this .reject.sum(); } public void reset () { this .total.reset(); this .success.reset(); this .failure.reset(); this .reject.reset(); } public MetricInfo metricInfo () { return new MetricInfo( totalCount(), successCount(), failureCount(), rejectCount() ); } @Override public String toString () { return String.format("Bucket[wt=%d,t=%d,s=%d,f=%d,r=%d]" , windowStartTimestamp, totalCount(), successCount(), failureCount(), rejectCount() ); } }
在Hystrix
中,为了更加灵活,Bucket
中的计数器设计为LongAdder[]
类型,便于通过各种需要计数事件枚举的顺序值来直接进行计数和累加,而为了节约内存空间,滑动窗口设计成一个容量固定可复用的环形队列BucketCircularArray#ListState
,这里可以站在巨人的肩膀上借鉴其思路实现BucketCircular
:
public class BucketCircular implements Iterable <Bucket > { private final AtomicReference<BucketArray> bucketArray; public BucketCircular (int bucketNumber) { AtomicReferenceArray<Bucket> buckets = new AtomicReferenceArray<>(bucketNumber + 1 ); this .bucketArray = new AtomicReference<>(new BucketArray(buckets, 0 , 0 , bucketNumber)); } public Bucket getTail () { return this .bucketArray.get().tail(); } public void addTail (Bucket bucket) { BucketArray bucketArray = this .bucketArray.get(); BucketArray newBucketArray = bucketArray.addBucket(bucket); this .bucketArray.compareAndSet(bucketArray, newBucketArray); } public Bucket[] toArray() { return this .bucketArray.get().toArray(); } public int size () { return this .bucketArray.get().getSize(); } @Override public Iterator<Bucket> iterator () { return Collections.unmodifiableList(Arrays.asList(toArray())).iterator(); } public void clear () { while (true ) { BucketArray bucketArray = this .bucketArray.get(); BucketArray clear = bucketArray.clear(); if (this .bucketArray.compareAndSet(bucketArray, clear)) { return ; } } } }
添加一个新的Bucket
到循环队列的尾部的时候,因为队列的长度是固定的,需要判断是否需要重新计算头指针和尾指针。测试一下:
public static void main (String[] args) throws Exception { BucketCircular circular = new BucketCircular(5 ); circular.addTail(new Bucket(111L )); circular.addTail(new Bucket(System.currentTimeMillis())); circular.addTail(new Bucket(System.currentTimeMillis())); circular.addTail(new Bucket(System.currentTimeMillis())); circular.addTail(new Bucket(System.currentTimeMillis())); circular.addTail(new Bucket(System.currentTimeMillis())); circular.addTail(new Bucket(222L )); Stream.of(circular.toArray()).forEach(System.out::println); } Bucket[wt=1603613365205 ,t=0 ,s=0 ,f=0 ,r=0 ] Bucket[wt=1603613365205 ,t=0 ,s=0 ,f=0 ,r=0 ] Bucket[wt=1603613365205 ,t=0 ,s=0 ,f=0 ,r=0 ] Bucket[wt=1603613365205 ,t=0 ,s=0 ,f=0 ,r=0 ] Bucket[wt=222 ,t=0 ,s=0 ,f=0 ,r=0 ]
接着编写一个用于管理Bucket
和提供数据统计入口的SlidingWindowMonitor
:
public class BucketCumulativeCalculator { private LongAdder total = new LongAdder(); private LongAdder success = new LongAdder(); private LongAdder failure = new LongAdder(); private LongAdder reject = new LongAdder(); public void addBucket (Bucket lb) { total.add(lb.totalCount()); success.add(lb.successCount()); failure.add(lb.failureCount()); reject.add(lb.rejectCount()); } public MetricInfo sum () { return new MetricInfo( total.sum(), success.sum(), failure.sum(), reject.sum() ); } public void reset () { total = new LongAdder(); success = new LongAdder(); failure = new LongAdder(); reject = new LongAdder(); } } public class SlidingWindowMonitor { private final int windowDuration = 10000 ; private final int bucketSizeInTimeUint = 1000 ; private final int bucketNumber = windowDuration / bucketSizeInTimeUint; private final BucketCircular bucketCircular; private final ReentrantLock lock; private final BucketCumulativeCalculator calculator = new BucketCumulativeCalculator(); public SlidingWindowMonitor () { this .bucketCircular = new BucketCircular(bucketNumber); this .lock = new ReentrantLock(); } void reset () { Bucket tailBucket = bucketCircular.getTail(); if (null != tailBucket) { calculator.addBucket(tailBucket); } bucketCircular.clear(); } public MetricInfo getCumulativeMetricInfo () { return getCurrentMetricInfo().merge(calculator.sum()); } public MetricInfo getCurrentMetricInfo () { Bucket currentBucket = getCurrentBucket(); if (null == currentBucket) { return MetricInfo.EMPTY; } return currentBucket.metricInfo(); } public MetricInfo getRollingMetricInfo () { Bucket currentBucket = getCurrentBucket(); if (null == currentBucket) { return MetricInfo.EMPTY; } MetricInfo info = new MetricInfo(0 , 0 , 0 , 0 ); for (Bucket bucket : this .bucketCircular) { info = info.merge(bucket.metricInfo()); } return info; } Bucket getCurrentBucket () { long time = System.currentTimeMillis(); Bucket tailBucket = bucketCircular.getTail(); if (null != tailBucket && time < tailBucket.getWindowStartTimestamp() + bucketSizeInTimeUint) { return tailBucket; } if (lock.tryLock()) { try { if (null == bucketCircular.getTail()) { Bucket newBucket = new Bucket(time); bucketCircular.addTail(newBucket); return newBucket; } else { for (int i = 0 ; i < bucketNumber; i++) { tailBucket = bucketCircular.getTail(); if (time < tailBucket.getWindowStartTimestamp() + bucketSizeInTimeUint) { return tailBucket; } else if (time > tailBucket.getWindowStartTimestamp() + bucketSizeInTimeUint + windowDuration) { reset(); return getCurrentBucket(); } else { bucketCircular.addTail(new Bucket(tailBucket.getWindowStartTimestamp() + bucketSizeInTimeUint)); calculator.addBucket(tailBucket); } } return bucketCircular.getTail(); } } finally { lock.unlock(); } } else { tailBucket = bucketCircular.getTail(); if (null != tailBucket) { return tailBucket; } try { Thread.sleep(5 ); } catch (InterruptedException ignore) { } return getCurrentBucket(); } } public void incrementTotal () { getCurrentBucket().increaseTotal(); } public void incrementSuccess () { getCurrentBucket().increaseSuccess(); } public void incrementFailure () { getCurrentBucket().increaseFailure(); } public void incrementReject () { getCurrentBucket().increaseReject(); } }
最后,把SlidingWindowMonitor
和之前的ResourceCircuitBreaker
做一次融合进化,得到SlidingWindowCircuitBreaker
:
package cn.throwx.cb;import java.util.concurrent.*;import java.util.concurrent.atomic.AtomicReference;import java.util.function.Supplier;public class SlidingWindowCircuitBreaker { private final long errorPercentThreshold; private final long resetTimeout; private AtomicReference<CircuitBreakerStatus> status; private final ThreadPoolExecutor executor; private final String circuitBreakerName; private long lastFailureTime; private final long executionTimeout; private final SlidingWindowMonitor slidingWindowMonitor; public SlidingWindowCircuitBreaker (String resourceName, long errorPercentThreshold, long resetTimeout) { CircuitBreakerResource resource = CircuitBreakerResourceManager.X.get(resourceName); this .circuitBreakerName = "SlidingWindowCircuitBreaker-" + resourceName; this .executor = resource.getExecutor(); this .executionTimeout = resource.getTimeout(); this .errorPercentThreshold = errorPercentThreshold; this .resetTimeout = resetTimeout; this .slidingWindowMonitor = new SlidingWindowMonitor(); reset(); } public void reset () { CircuitBreakerStatusMonitor.X.reset(this .circuitBreakerName); this .status = new AtomicReference<>(CircuitBreakerStatus.CLOSED); this .lastFailureTime = -1L ; } @SuppressWarnings("unchecked") public <T> T call (Supplier<T> supplier) { return call(supplier, (Fallback<T>) Fallback.F); } public <T> T call (Supplier<T> supplier, Fallback<T> fallback) { try { if (shouldAllowExecution()) { slidingWindowMonitor.incrementTotal(); Future<T> future = this .executor.submit(warp(supplier)); T result = future.get(executionTimeout, TimeUnit.MILLISECONDS); markSuccess(); return result; } } catch (RejectedExecutionException ree) { markReject(); } catch (Exception e) { markFailure(); } return fallback.fallback(); } <T> Callable<T> warp (Supplier<T> supplier) { return supplier::get; } public void call (Runnable runnable) { call(() -> { runnable.run(); return null ; }); } boolean shouldAllowExecution () { if (lastFailureTime == -1L ) { return true ; } if (errorPercentThreshold > rollingErrorPercentage()) { return true ; } return shouldTryAfterRestTimeoutWindow() && changeStatus(CircuitBreakerStatus.OPEN, CircuitBreakerStatus.HALF_OPEN); } boolean changeStatus (CircuitBreakerStatus o, CircuitBreakerStatus n) { boolean r = status.compareAndSet(o, n); if (r) { CircuitBreakerStatusMonitor.X.report(this .circuitBreakerName, o, n); } return r; } boolean shouldTryAfterRestTimeoutWindow () { long lastFailureTimeSnap = lastFailureTime; long currentTime = System.currentTimeMillis(); return currentTime > lastFailureTimeSnap + resetTimeout; } public void markSuccess () { slidingWindowMonitor.incrementSuccess(); if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.CLOSED)) { reset(); } } public void markReject () { slidingWindowMonitor.incrementReject(); if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.OPEN)) { this .lastFailureTime = System.currentTimeMillis(); } } public int rollingErrorPercentage () { MetricInfo rollingMetricInfo = slidingWindowMonitor.getRollingMetricInfo(); long rejectCount = rollingMetricInfo.getReject(); long failureCount = rollingMetricInfo.getFailure(); long totalCount = rollingMetricInfo.getTotal(); int errorPercentage = (int ) ((double ) (rejectCount + failureCount) / totalCount * 100 ); CircuitBreakerStatusMonitor.X.report(this .circuitBreakerName, String.format("错误百分比:%d" , errorPercentage)); return errorPercentage; } public void markFailure () { slidingWindowMonitor.incrementFailure(); if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.OPEN)) { this .lastFailureTime = System.currentTimeMillis(); } if (rollingErrorPercentage() >= errorPercentThreshold && changeStatus(CircuitBreakerStatus.CLOSED, CircuitBreakerStatus.OPEN)) { this .lastFailureTime = System.currentTimeMillis(); } } }
编写一个测试客户端SlidingWindowCircuitBreakerClient
:
public class SlidingWindowCircuitBreakerClient { public static void main (String[] args) throws Exception { CircuitBreakerResourceConf conf = new CircuitBreakerResourceConf(); conf.setCoreSize(10 ); conf.setQueueSize(0 ); conf.setResourceName("SERVICE" ); conf.setTimeout(50 ); CircuitBreakerResourceManager.X.register(conf); Service service = new Service(); SlidingWindowCircuitBreaker cb = new SlidingWindowCircuitBreaker("SERVICE" , 50 , 500 ); for (int i = 0 ; i < 10 ; i++) { int temp = i; String result = cb.call(() -> service.process(temp)); System.out.println(String.format("返回结果:%s,number:%d" , result, temp)); } Thread.sleep(501L ); cb.call(service::processSuccess); for (int i = 0 ; i < 3 ; i++) { int temp = i; String result = cb.call(() -> service.process(temp)); System.out.println(String.format("返回结果:%s,number:%d" , result, temp)); } Thread.sleep(501L ); cb.call(service::processSuccess); cb.call(service::processSuccess); } public static class Service { private final Random r = new Random(); public String process (int i) { int sleep = r.nextInt(200 ); System.out.println(String.format("线程[%s]-进入process方法,number:%d,休眠%d毫秒" , Thread.currentThread().getName(), i, sleep)); try { Thread.sleep(sleep); } catch (InterruptedException ignore) { } return String.valueOf(i); } public void processSuccess () { System.out.println(String.format("线程[%s]-调用processSuccess方法" , Thread.currentThread().getName())); } } }
某次执行结果如下:
断路器[SlidingWindowCircuitBreaker-SERVICE]重置 线程[SERVICE-CircuitBreakerWorker-0]-进入process方法,number:0,休眠67毫秒 断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:100 断路器[SlidingWindowCircuitBreaker-SERVICE]状态变更,[CLOSED]->[OPEN] 返回结果:null,number:0 断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:100 返回结果:null,number:1 断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:100 返回结果:null,number:2 断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:100 返回结果:null,number:3 断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:100 返回结果:null,number:4 断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:100 返回结果:null,number:5 断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:100 返回结果:null,number:6 断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:100 返回结果:null,number:7 断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:100 返回结果:null,number:8 断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:100 返回结果:null,number:9 断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:100 断路器[SlidingWindowCircuitBreaker-SERVICE]状态变更,[OPEN]->[HALF_OPEN] 线程[SERVICE-CircuitBreakerWorker-1]-调用processSuccess方法 断路器[SlidingWindowCircuitBreaker-SERVICE]状态变更,[HALF_OPEN]->[CLOSED] 断路器[SlidingWindowCircuitBreaker-SERVICE]重置 线程[SERVICE-CircuitBreakerWorker-2]-进入process方法,number:0,休眠84毫秒 断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:66 断路器[SlidingWindowCircuitBreaker-SERVICE]状态变更,[CLOSED]->[OPEN] 返回结果:null,number:0 断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:66 返回结果:null,number:1 断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:66 返回结果:null,number:2 断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:66 断路器[SlidingWindowCircuitBreaker-SERVICE]状态变更,[OPEN]->[HALF_OPEN] 线程[SERVICE-CircuitBreakerWorker-3]-调用processSuccess方法 断路器[SlidingWindowCircuitBreaker-SERVICE]状态变更,[HALF_OPEN]->[CLOSED] 断路器[SlidingWindowCircuitBreaker-SERVICE]重置 线程[SERVICE-CircuitBreakerWorker-4]-调用processSuccess方法
小结 生产上应用CircuitBreaker
模式建议使用主流实现例如Hystrix
或者更活跃的Sentinel
,但是要深入学习此模式则需要老老实实做一次推演。
参考资料:
CircuitBreaker - by Martin Fowler
《Release It! Design and Deploy Production-Ready Software》
本文代码仓库:
https://github.com/zjcscut/framework-mesh/tree/master/circuit-breaker
(本文完 c-4-d e-a-20201025)