当前位置:首页 » 《随便一记》 » 正文

RocketMQ 消息负载均衡策略解析——图解、源码级解析

0 人参与  2022年07月22日 08:22  分类 : 《随便一记》  评论

点击全文阅读


? Java学习:Java从入门到精通总结

? 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

? 绝对不一样的职场干货:大厂最佳实践经验指南


? 最近更新:2022年6月25日

? 个人简介:通信工程本硕?、Java程序员?。做过科研paper,发过专利,优秀的程序员不应该只是CRUD

? 点赞 ? 收藏 ⭐留言 ? 都是我最大的动力!


文章目录

默认策略随机策略使用方式源码 Hash策略使用方式源码 就近策略使用方式源码

Producer发送消息时,会首先获取Topic路由信息(通过本地 + 注册中心拉取),RocketMQ的架构里有多个Broker服务器,而消息队列也会存在于多个Broker服务器里,所以就需要负载均衡策略来将流量尽可能均匀的打到所有服务器上。

在这里插入图片描述

本章节就介绍一下RocketMQ中常用的四种负载均衡策略。



默认策略

找到Producer发送消息时选择消息队列的逻辑,在DefaultMQProducerImpl类中定义了sendDefaultImpl方法:
在这里插入图片描述
进入到selectOneMessageQueue方法里:

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {        return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);    }

上述代码的MQFaultStrategy类中定义了selectOneMessageQueue方法:

public class MQFaultStrategy {/**     * 默认负载均衡策略     *      * @param tpInfo     * @param lastBrokerName     * @return     */    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {    // 检查消息延迟容错开关        if (this.sendLatencyFaultEnable) {            try {                // 按顺序依次选择                int index = tpInfo.getSendWhichQueue().getAndIncrement();                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();                    if (pos < 0)                        pos = 0;                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);                    // 选取时仍然会先选择相同集群下的其他MessageQueue                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))                            return mq;                    }                }// 从其他Broker里选择一个,该列表的节点根据是否可用,超时时间和最新可用时间做了排序                /*                 * ......                 */                            } catch (Exception e) {            }            // 默认策略            return tpInfo.selectOneMessageQueue();        }        // 延迟容忍开关没开时的默认策略        return tpInfo.selectOneMessageQueue(lastBrokerName);    }}

根据源码可以很清楚地看到,默认策略就是依次选择消息队列进行发送,具体的执行细节如下:

判断延迟容错开关是否打开了,如果打开了就根据默认策略返回一个MQ,否则直接使用TopicPublishInfo中的selectOneMessageQueue(lastBrokerName)方法返回一个MQ获取当前轮询到的MQ的索引。当第一次发送消息时,ThreadLocal里存的值如果为空就随机生成一个数字,否则就给这个数字加1

这里tpInfo.getSendWhichQueue()是存在于ThreadLocal里的,有关资料参考 https://javaguide.cn/java/concurrent/threadlocal/

如果上一次发送该集群超时失败,选取时仍然会先选择相同集群下的其他MessageQueue如果第3步里没有选出来,则从之前失败过的列表中选择一个较好的Broker

如何选一个较好的Broker呢?
RocketMQ的实现是按照该列表的节点根据是否可用,超时时间和最新可用时间做了排序

如果第3、4步都没有选出来,则走到默认策略(轮询出一个新的MQ来)
public MessageQueue selectOneMessageQueue() {        int index = this.sendWhichQueue.getAndIncrement();        int pos = Math.abs(index) % this.messageQueueList.size();        if (pos < 0)            pos = 0;        return this.messageQueueList.get(pos);    }
第一步里如果没有打开延迟容错开关,进入
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {        if (lastBrokerName == null) {            return selectOneMessageQueue();        } else {            int index = this.sendWhichQueue.getAndIncrement();            for (int i = 0; i < this.messageQueueList.size(); i++) {                int pos = Math.abs(index++) % this.messageQueueList.size();                if (pos < 0)                    pos = 0;                MessageQueue mq = this.messageQueueList.get(pos);                if (!mq.getBrokerName().equals(lastBrokerName)) {                    return mq;                }            }            return selectOneMessageQueue();        }    }

从不是上一次使用的Broker里选一个MQ出来。


上面过程,用流程图总结如下:
在这里插入图片描述



随机策略

使用方式

在编程中,想使用随机策略的话也非常简单,只用传进去一个选择器即可:

producer.send(message, new SelectMessageQueueByRandoom(), " ");

有一个比较有意思的问题,我这里是用的3.5.8版本的RocketMQ,上面方法里的【随机】一词拼写错误,正确的应该是Random,可能是一开始就手误了吧,后面为了兼容性不好直接修改名称。。。。

源码

public class SelectMessageQueueByRandoom implements MessageQueueSelector {    private Random random = new Random(System.currentTimeMillis());    @Override    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {        int value = random.nextInt();        if (value < 0) {            value = Math.abs(value);        }        value = value % mqs.size();        return mqs.get(value);    }}

SelectMessageQueueByRandoom的源码也很易读,就是随机选取一个MQ并返回



Hash策略

使用方式

要使用Hash策略发送消息,只需传入一个SelectMessageQueueByHash对象即可:

producer.send(message, new SelectMessageQueueByHash(), " ");

源码

public class SelectMessageQueueByHash implements MessageQueueSelector {    @Override    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {    // arg的计算哈希值        int value = arg.hashCode();        if (value < 0) {            value = Math.abs(value);        }        value = value % mqs.size();        return mqs.get(value);    }}

和随机策略类似,Hash负载均衡策略也很简单,通过arg的hash值来决定返回哪一个MQ



就近策略

使用方式

要使用Hash策略发送消息,只需传入一个SelectMessageQueueByMachineRoom对象即可:

producer.send(message, new SelectMessageQueueByMachineRoom(), " ");

源码

public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {    private Set<String> consumeridcs;    @Override    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {        return null;    }    public Set<String> getConsumeridcs() {        return consumeridcs;    }    public void setConsumeridcs(Set<String> consumeridcs) {        this.consumeridcs = consumeridcs;    }}

有意思的是机房策略的select代码在RocketMQ里并没有编写,而是直接返回null,如果用户有这个需求的话要自行编写!


点击全文阅读


本文链接:http://m.zhangshiyu.com/post/43667.html

<< 上一篇 下一篇 >>

  • 评论(0)
  • 赞助本站

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

最新文章

  • 姐姐被禁止回家过年后我杀疯了章节目录小说-刘初琴免费阅读全文
  • 爆款小说由作者务实***所创作的姐姐被禁止回家过年后我杀疯了在线阅读
  • 无端坠入红尘梦小说云袭月秦执礼(无端坠入红尘梦小说)全文免费阅读无弹窗大结局_(云袭月秦执礼免费阅读全文大结局)最新章节列表_笔趣阁(云袭月秦执礼) -
  • 补贴系统:我在古代扩展团队赚大钱小说全文赵百汇赵锦衣小说免费阅读完整版_《补贴系统:我在古代扩展团队赚大钱小说全文》赵百汇赵锦衣最新章节在线阅读 -
  • 杀手跳崖没死,捡个男人当药引最新章节列表(谢砚卿沈宁)最新章节免费在线阅读_(杀手跳崖没死,捡个男人当药引最新章节列表)谢砚卿沈宁完结版免费阅读 -
  • 我回归后,全家人痛改前非乔念萧衡,我回归后,全家人痛改前非在线无弹窗阅读
  • 无限流:我的身份越来越离谱免费阅读,无限流:我的身份越来越离谱章节在线阅读
  • 《重生八零,手撕渣男嫁团长》小说章节在线试读,《重生八零,手撕渣男嫁团长》最新章节目录
  • 我回归后,全家人痛改前非免费阅读,我回归后,全家人痛改前非乔念萧衡
  • 《傅总,你又一次让我失望了》小说大结局免费试读 陆奕然小说
  • 向女友烟花求婚的跨年夜,她跟别人私奔了免费阅读,向女友烟花求婚的跨年夜,她跟别人私奔了林鸿风谷文君
  • 已完结小说《向女友烟花求婚的跨年夜,她跟别人私奔了》最新章节

    关于我们 | 我要投稿 | 免责申明

    Copyright © 2020-2022 ZhangShiYu.com Rights Reserved.豫ICP备2022013469号-1