本文共 7218 字,大约阅读时间需要 24 分钟。
上一篇实现了服务的限流,本篇来实现服务的熔断。
首先还是贴出github代码地址,想直接看代码的可以直接下载运行:https://github.com/whiteBX/wrpc
在现在的微服务架构下,由于服务众多,调用链路长,很可能其中某个服务有时会出现异常导致服务不可用,例如发布导致bug、机房网络问题等,这种时候如果没有一种保护机制的话,很容易引起服务雪崩。这个时候就引入了服务熔断,本篇就来实现一个简单的熔断器。
首先我们定义一下熔断的状态,如下图:
此图中标注了我们熔断器的三种状态:全开 半开 关闭。
他们的流转过程为:接下来看代码实现:
public class CircuitUtil { // 达到默认请求基数才判断开启熔断 private static final int DEFAULT_FAIL_COUNT = 5; // 半开转换为全开时间(毫秒数) private static final long DEFAULT_HALF_OPEN_TRANSFER_TIME = 10000; // 默认失败比例值开启熔断 private static final double DEFAULT_FAIL_RATE = 0.8D; // 计数 pair左边成功,右边失败 private Map> counter = new ConcurrentHashMap<>(); // 熔断器当前状态 private volatile CircuitStatusEnum status = CircuitStatusEnum.CLOSE; // 最后一次处于全开状态的时间 private volatile long timestamp; private final Semaphore semaphore = new Semaphore(1); /** * 简易熔断流程 * 1:判断是否打开熔断,打开则直接返回指定信息 * 2:执行逻辑,成功失败都进行标记 markSuccess markFail * * @param caller * @return * @throws Throwable */ public String doCircuit(String methodName, Caller caller, String serverHost, String param) throws Throwable { if (isOpen()) { return "{\"code\":-1,\"message\":\"circuit break\"}"; } String result; result = caller.call(serverHost, param); if ("exception".equals(result)) { markFail(methodName); return "{\"code\":-1,\"message\":\"exception request\"}"; } markSuccess(methodName); return result; } /** * 判断熔断是否打开 全开状态是判断是否转为半开并放过一个请求 * * @return */ private boolean isOpen() { boolean openFlag = true; // 关闭 if (status.equals(CircuitStatusEnum.CLOSE)) { openFlag = false; } // 全开 if (status.equals(CircuitStatusEnum.OPEN)) { // 未到半开时间,返回打开 if (System.currentTimeMillis() - timestamp < DEFAULT_HALF_OPEN_TRANSFER_TIME) { return openFlag; } // 已到半开时间,改为半开状态,通过一个请求 if (semaphore.tryAcquire()) { status = CircuitStatusEnum.HALF_OPEN; timestamp = System.currentTimeMillis(); openFlag = false; semaphore.release(); } } return openFlag; } /** * 标记成功 * 1.半开状态,成功一次转换为关闭状态 * 2.其他情况增加成功记录次数 * * @param operation */ private void markSuccess(String operation) { Pair pair = counter.get(operation); if (pair == null) { counter.put(operation, new Pair<>(new AtomicInteger(), new AtomicInteger())); } // 半开状态,成功一次转换为关闭状态 if (status == CircuitStatusEnum.HALF_OPEN) { status = CircuitStatusEnum.CLOSE; counter.put(operation, new Pair<>(new AtomicInteger(), new AtomicInteger())); } else { counter.get(operation).getKey().incrementAndGet(); } } /** * 标记失败 * 1.半开状态,失败一次回退到打开状态 * 2.其他状态判断错误比例决定是否打开熔断 * * @param operation */ private void markFail(String operation) { // 半开状态失败变更为全开,否则计数判断 if (status == CircuitStatusEnum.HALF_OPEN) { status = CircuitStatusEnum.OPEN; timestamp = System.currentTimeMillis(); } else { Pair pair = counter.get(operation); if (pair == null) { counter.put(operation, new Pair<>(new AtomicInteger(), new AtomicInteger())); pair = counter.get(operation); } int failCount = pair.getValue().incrementAndGet(); int successCount = pair.getKey().get(); int totalCount = failCount + successCount; double failRate = (double) failCount / (double) totalCount; if (totalCount >= DEFAULT_FAIL_COUNT && failRate > DEFAULT_FAIL_RATE) { status = CircuitStatusEnum.OPEN; timestamp = System.currentTimeMillis(); } } }}
然后是改造在我们的RPC框架中引入熔断,修改类RPCConsumer的如下方法:
publicT getBean(final Class clazz, final String appCode) { return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{ clazz}, new InvocationHandler() { public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // 获取服务器地址 String serverHost = getServer(appCode); Span span = SpanBuilder.buildNewSpan(SpanHolder.get(), method.getName(), serverHost, appCode); TODO: 2018/10/25 新启线程发起rpc调用远程链路追踪服务记录追踪日志 此处打日志代替 System.out.println("链路追踪,调用远程服务:" + JSON.toJSONString(span)); BaseRequestBO baseRequestBO = buildBaseBO(span, clazz.getName(), method, JSON.toJSONString(args[0])); String result = circuitUtil.doCircuit(method.getName(), remoteCaller, serverHost, JSON.toJSONString(baseRequestBO)); return JSON.parseObject(result, method.getReturnType()); } }); }
其中的remoteCaller是我们本次修改的抽象出的远程调用,代码如下:
public interface Caller { /** * 调用 * @param serverHost * @param param * @return */ String call(String serverHost, String param) ;}public class RemoteCaller implements Caller { /** * netty客户端 */ private NettyClient nettyClient = new NettyClient(); /** * 远程调用 * * @param serverHost * @param param * @return */ @Override public String call(String serverHost, String param) { try { if (serverHost == null) { System.out.println("远程调用错误:当前无服务提供者"); return "{\"code\":404,\"message\":\"no provider\"}"; } // 连接netty,请求并接收响应 RpcClientNettyHandler clientHandler = new RpcClientNettyHandler(); clientHandler.setParam(param); nettyClient.initClient(serverHost, clientHandler); String result = clientHandler.process(); System.out.println(MessageFormat.format("调用服务器:{0},请求参数:{1},响应参数:{2}", serverHost, param, result)); return result; } catch (Exception e) { System.out.println("远程服务调用失败:" + e); return "error"; } }}
接下来修改HelloServiceImpl的实现:
public class HelloServiceImpl implements HelloService { @Override public HelloResponse hello(HelloRequest request) { System.out.println("服务端收到请求,序列号:" + request.getSeq()); if (request.getSeq() < 0) { throw new RuntimeException("seq error"); } HelloResponse response = new HelloResponse(); response.setCode(200); response.setMessage("success:" + request.getSeq()); return response; }}
此处加入在传入的seq为负值时抛出异常。接下来启动服务器和客户端进行调用:
客户端传入-1时返回:{ "code":-1,"message":"exception request"}连续五次后返回:{ "code":-1,"message":"circuit break"}此时处于熔断状态,即便传入正值1,也会返回:{ "code":-1,"message":"circuit break"}等10S后在传入1时返回正常,服务恢复可正常使用。
在熔断后我们可以做相应的降级处理,比如一个远程调用,在发现对方服务响应大量超时时,我们可以将对方熔断,之后降级成为替代方案去继续执行我们的方法,就不会引起我们自己的服务也不可用了,达到我们自己服务的高可用的目的。
上述的代码都在github上了,各位可以下载后用docker启动一个zookeeper即可运行。
转载地址:http://hhsni.baihongyu.com/