博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【RPC】一步一步实现基于netty+zookeeper的RPC框架(六)
阅读量:4079 次
发布时间:2019-05-25

本文共 7218 字,大约阅读时间需要 24 分钟。

上一篇实现了服务的限流,本篇来实现服务的熔断。

    首先还是贴出github代码地址,想直接看代码的可以直接下载运行:https://github.com/whiteBX/wrpc

    在现在的微服务架构下,由于服务众多,调用链路长,很可能其中某个服务有时会出现异常导致服务不可用,例如发布导致bug、机房网络问题等,这种时候如果没有一种保护机制的话,很容易引起服务雪崩。这个时候就引入了服务熔断,本篇就来实现一个简单的熔断器。

    首先我们定义一下熔断的状态,如下图:

微信图片_20190622202311.png

此图中标注了我们熔断器的三种状态:全开 半开 关闭

他们的流转过程为:

  1. 初始时为关闭状态。
  2. 当遇到错误到我们预设的阈值比例后转换为全开状态。
  3. 经过一定时间后,熔断器变为半开状态。
  4. 半开状态时允许通过一个请求,此请求成功则转为关闭状态,失败则变为全开状态。

接下来看代码实现:

public class CircuitUtil {
// 达到默认请求基数才判断开启熔断 private static final int DEFAULT_FAIL_COUNT = 5; // 半开转换为全开时间(毫秒数) private static final long DEFAULT_HALF_OPEN_TRANSFER_TIME = 10000; // 默认失败比例值开启熔断 private static final double DEFAULT_FAIL_RATE = 0.8D; // 计数 pair左边成功,右边失败 private Map
> counter = new ConcurrentHashMap<>(); // 熔断器当前状态 private volatile CircuitStatusEnum status = CircuitStatusEnum.CLOSE; // 最后一次处于全开状态的时间 private volatile long timestamp; private final Semaphore semaphore = new Semaphore(1); /** * 简易熔断流程 * 1:判断是否打开熔断,打开则直接返回指定信息 * 2:执行逻辑,成功失败都进行标记 markSuccess markFail * * @param caller * @return * @throws Throwable */ public String doCircuit(String methodName, Caller caller, String serverHost, String param) throws Throwable {
if (isOpen()) {
return "{\"code\":-1,\"message\":\"circuit break\"}"; } String result; result = caller.call(serverHost, param); if ("exception".equals(result)) {
markFail(methodName); return "{\"code\":-1,\"message\":\"exception request\"}"; } markSuccess(methodName); return result; } /** * 判断熔断是否打开 全开状态是判断是否转为半开并放过一个请求 * * @return */ private boolean isOpen() {
boolean openFlag = true; // 关闭 if (status.equals(CircuitStatusEnum.CLOSE)) {
openFlag = false; } // 全开 if (status.equals(CircuitStatusEnum.OPEN)) {
// 未到半开时间,返回打开 if (System.currentTimeMillis() - timestamp < DEFAULT_HALF_OPEN_TRANSFER_TIME) {
return openFlag; } // 已到半开时间,改为半开状态,通过一个请求 if (semaphore.tryAcquire()) {
status = CircuitStatusEnum.HALF_OPEN; timestamp = System.currentTimeMillis(); openFlag = false; semaphore.release(); } } return openFlag; } /** * 标记成功 * 1.半开状态,成功一次转换为关闭状态 * 2.其他情况增加成功记录次数 * * @param operation */ private void markSuccess(String operation) {
Pair
pair = counter.get(operation); if (pair == null) {
counter.put(operation, new Pair<>(new AtomicInteger(), new AtomicInteger())); } // 半开状态,成功一次转换为关闭状态 if (status == CircuitStatusEnum.HALF_OPEN) {
status = CircuitStatusEnum.CLOSE; counter.put(operation, new Pair<>(new AtomicInteger(), new AtomicInteger())); } else {
counter.get(operation).getKey().incrementAndGet(); } } /** * 标记失败 * 1.半开状态,失败一次回退到打开状态 * 2.其他状态判断错误比例决定是否打开熔断 * * @param operation */ private void markFail(String operation) {
// 半开状态失败变更为全开,否则计数判断 if (status == CircuitStatusEnum.HALF_OPEN) {
status = CircuitStatusEnum.OPEN; timestamp = System.currentTimeMillis(); } else {
Pair
pair = counter.get(operation); if (pair == null) {
counter.put(operation, new Pair<>(new AtomicInteger(), new AtomicInteger())); pair = counter.get(operation); } int failCount = pair.getValue().incrementAndGet(); int successCount = pair.getKey().get(); int totalCount = failCount + successCount; double failRate = (double) failCount / (double) totalCount; if (totalCount >= DEFAULT_FAIL_COUNT && failRate > DEFAULT_FAIL_RATE) {
status = CircuitStatusEnum.OPEN; timestamp = System.currentTimeMillis(); } } }}

然后是改造在我们的RPC框架中引入熔断,修改类RPCConsumer的如下方法:

public 
T getBean(final Class
clazz, final String appCode) {
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{
clazz}, new InvocationHandler() {
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 获取服务器地址 String serverHost = getServer(appCode); Span span = SpanBuilder.buildNewSpan(SpanHolder.get(), method.getName(), serverHost, appCode); TODO: 2018/10/25 新启线程发起rpc调用远程链路追踪服务记录追踪日志 此处打日志代替 System.out.println("链路追踪,调用远程服务:" + JSON.toJSONString(span)); BaseRequestBO baseRequestBO = buildBaseBO(span, clazz.getName(), method, JSON.toJSONString(args[0])); String result = circuitUtil.doCircuit(method.getName(), remoteCaller, serverHost, JSON.toJSONString(baseRequestBO)); return JSON.parseObject(result, method.getReturnType()); } }); }

其中的remoteCaller是我们本次修改的抽象出的远程调用,代码如下:

public interface Caller {
/** * 调用 * @param serverHost * @param param * @return */ String call(String serverHost, String param) ;}public class RemoteCaller implements Caller {
/** * netty客户端 */ private NettyClient nettyClient = new NettyClient(); /** * 远程调用 * * @param serverHost * @param param * @return */ @Override public String call(String serverHost, String param) {
try {
if (serverHost == null) {
System.out.println("远程调用错误:当前无服务提供者"); return "{\"code\":404,\"message\":\"no provider\"}"; } // 连接netty,请求并接收响应 RpcClientNettyHandler clientHandler = new RpcClientNettyHandler(); clientHandler.setParam(param); nettyClient.initClient(serverHost, clientHandler); String result = clientHandler.process(); System.out.println(MessageFormat.format("调用服务器:{0},请求参数:{1},响应参数:{2}", serverHost, param, result)); return result; } catch (Exception e) {
System.out.println("远程服务调用失败:" + e); return "error"; } }}

接下来修改HelloServiceImpl的实现:

public class HelloServiceImpl implements HelloService {
@Override public HelloResponse hello(HelloRequest request) {
System.out.println("服务端收到请求,序列号:" + request.getSeq()); if (request.getSeq() < 0) {
throw new RuntimeException("seq error"); } HelloResponse response = new HelloResponse(); response.setCode(200); response.setMessage("success:" + request.getSeq()); return response; }}

此处加入在传入的seq为负值时抛出异常。接下来启动服务器和客户端进行调用:

客户端传入-1时返回:{
"code":-1,"message":"exception request"}连续五次后返回:{
"code":-1,"message":"circuit break"}此时处于熔断状态,即便传入正值1,也会返回:{
"code":-1,"message":"circuit break"}等10S后在传入1时返回正常,服务恢复可正常使用。

在熔断后我们可以做相应的降级处理,比如一个远程调用,在发现对方服务响应大量超时时,我们可以将对方熔断,之后降级成为替代方案去继续执行我们的方法,就不会引起我们自己的服务也不可用了,达到我们自己服务的高可用的目的。

上述的代码都在github上了,各位可以下载后用docker启动一个zookeeper即可运行。

转载地址:http://hhsni.baihongyu.com/

你可能感兴趣的文章
19. Remove Nth Node From End of List (双指针)
查看>>
49. Group Anagrams (String, Map)
查看>>
139. Word Break (DP)
查看>>
Tensorflow入门资料
查看>>
剑指_用两个栈实现队列
查看>>
剑指_顺时针打印矩阵
查看>>
剑指_栈的压入弹出序列
查看>>
剑指_复杂链表的复制
查看>>
服务器普通用户(非管理员账户)在自己目录下安装TensorFlow
查看>>
星环后台研发实习面经
查看>>
大数相乘不能用自带大数类型
查看>>
字节跳动后端开发一面
查看>>
CentOS Tensorflow 基础环境配置
查看>>
centOS7安装FTP
查看>>
FTP的命令
查看>>
CentOS操作系统下安装yum的方法
查看>>
ping 报name or service not known
查看>>
FTP 常见问题
查看>>
zookeeper单机集群安装
查看>>
do_generic_file_read()函数
查看>>