原创

RocketMQ结构和原理分析

RocketMQ组织结构

RocketMQ角色

从图中可以看出,RocketMQ主要由4部分构成:

  • NameServer集群
  • Broker集群
  • Producer集群
  • Consumer集群

其中,Broker集群、Producer集群、Consumer集群都需要与NameServer集群进⾏通信。

Broker集群:
Broker⽤于接收⽣产者发送消息,或者接收消费者消费消息的请求。⼀个Broker集群由多组Master/Slave组成,Master可写可读,Slave只可以读,Master将写⼊的数据同步给Slave。
每个Broker节点,在启动时,都会遍历NameServer列表,与每个NameServer建⽴⻓连接,注册⾃⼰的信息,之后定时上报。

Producer集群:
消息的⽣产者,通过NameServer集群获得Topic的路由信息,包括Topic下⾯有哪些Queue,这些Queue分布在哪些Broker上等。Producer只会将消息发送到Broker的Master节点上,因此只需要与Master节点建⽴连接。

Consumer集群:
消息的消费者,通过NameServer集群获得Topic的路由信息,连接到对应的Broker上消费消息。注意,由于Master和Slave都可以读取消息,因此Consumer会与Master和Slave都建⽴连接。

NameServer

主要作用就是管理Broker。

集群中的各个服务都需要通过NameServer来了解集群中其他组件的状态。

服务发现

当生产者和消费者客户端想要发送消息和订阅队列时,都要通过注册中心获得每个服务端的信息。其中【NameServer】就是作为注册中心提供服务注册和服务发现功能。

当客户端获得服务端的状态后,通过负载均衡策略发送或者订阅分配到各个Broker上。

NameServer服务注册发现功能

P.S.
为什么RocketMQ要自己开发一套NameServer呢?
RocketMQ设计之初时参考的另⼀款消息中间件Kafka就使⽤了Zookeeper,Zookeeper其提供了Master选举、分布式锁、数据的发布和订阅等诸多功能。
⽽RocketMQ的架构设计决定了只需要⼀个轻量级的元数据服务器就⾜够了,只需要保持最终⼀致,⽽不需要Zookeeper这样的强⼀致性解决⽅案,不需要再依赖另⼀个中间件,从⽽减少整体维护成本。

保证最终一致性

NameServer作为⼀个名称服务,需要提供服务注册、服务剔除、服务发现这些基本功能,但是NameServer节点之间并不通信,在某个时刻各个节点数据可能不⼀致的情况下,如何保证客户端可以最终拿到正确的数据。下⾯分别从路由注册、路由剔除,路由发现三个⻆度进⾏介绍。

服务注册

对于Zookeeper、Etcd这样强⼀致性组件,数据只要写到主节点,内部会通过状态机将数据复制到其他节点,Zookeeper使⽤的是Zab协议,etcd使⽤的是raft协议。

但是NameServer节点之间是互不通信的,⽆法进⾏数据复制。RocketMQ采取的策略是,在Broker节点在启动的时候,轮询NameServer列表,与每个NameServer节点建⽴⻓连接,发起注册请求。NameServer内部会维护⼀个Broker表,⽤来动态存储Broker的信息。

同时,Broker节点为了证明⾃⼰是存活的,会将最新的信息上报给NameServer,然后每隔30秒向NameServer发送⼼跳包,⼼跳包中包含【BrokerId、Broker地址、Broker名称、Broker所属集群名称、队列和brokerIP对应关系】等等,然后NameServer接收到⼼跳包后,会更新时间戳,记录这个Broker的最新存活时间

NameServer在处理⼼跳包的时候,存在多个Broker同时操作⼀张Broker表,为了防⽌并发修改Broker表导致不安全,路由注册操作引⼊了ReadWriteLock读写锁,这个设计亮点允许多个消息⽣产者并发读,保证了消息发送时的⾼并发,但是同⼀时NameServer只能处理⼀个Broker⼼跳包,多个⼼跳包串⾏处理。这也是读写锁的经典使⽤场景,即读多写少。这样也保证了producer或者consumer在读取路由信息时,如果此时正在处理心跳包,那么需要阻塞直到最新的数据为止。

服务卸载

正常情况下,如果Broker关闭,则会与NameServer断开⻓连接,Netty的通道关闭监听器会监听到连接断开事件,然后会将这个Broker信息剔除掉。

除了这种方式之外,NameServer每隔10s会遍历Broker表,如果某个Broker的⼼跳包最新时间戳距离当前时间超多120秒,也会判定Broker失效并将其移除。

特别的,对于⼀些⽇常运维⼯作,例如:Broker升级,RocketMQ提供了⼀种优雅剔除路由信息的⽅式。例如在升级⼀个Master节点之前,可以先通过命令⾏⼯具禁⽌这个Broker的写权限,发送消息到这个Broker的请求,都会收到⼀个NO_PERMISSION响应,客户端会⾃动重试其他的Broker。当观察到这个broker没有流量后,再将这个Broker移除。

服务发现

路由发现是客户端的⾏为,这⾥的客户端主要说的是⽣产者和消费者。具体来说:

  • 对于⽣产者,可以发送消息到多个Topic,因此生产者是在发送第⼀条消息时,才会根据Topic获取从NameServer获取路由信息。
  • 对于消费者,订阅的Topic⼀般是固定的,所在在启动时就会拉取。

那么⽣产者/消费者在⼯作的过程中,如果路由信息发⽣了变化怎么处理呢?常见的几种情况有Broker集群新增了节点,节点宕机或者Queue的数量发⽣了变化。前⾯讲解NameServer在路由注册或者路由剔除过程中,并不会主动推送到客户端的,这意味着,需要由客户端拉取主题的最新路由信息。

这里我们可以得出,客户端会定期拉取Topic最新路由信息。这里我们从源码入手看看是怎样做的。

默认我们创建producer和consumer都是创建的DefaultMQProducerDefaultMQPushConsumer,从代码中我们可以发现在DefaultMQProducerDefaultMQPushConsumer执行start方法时,都会调用到MQClientInstance#start方法,来看下这个方法:

public void start() throws MQClientException {

    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // Start request-response channel
                this.mQClientAPIImpl.start();
                // Start various schedule tasks
                this.startScheduledTask();
                // Start pull service
                this.pullMessageService.start();
                // Start rebalance service
                this.rebalanceService.start();
                // Start push service
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

this.startScheduledTask();这里开启了一些定时任务,继续向下:

private void startScheduledTask() {
    // 更新NameServer
    ...

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                // 更新路由信息
                MQClientInstance.this.updateTopicRouteInfoFromNameServer();
            } catch (Exception e) {
                log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
            }
        }
    }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                // 清除已下线的broker
                MQClientInstance.this.cleanOfflineBroker();
                // 向broker发送心跳包
                MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
            } catch (Exception e) {
                log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
            }
        }
    }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

    // 其他
    ...
}

通过这里我们发现每隔this.clientConfig.getPollNameServerInterval()调用一次MQClientInstance#updateTopicRouteInfoFromNameServerClientConfig#pollNameServerInterval默认是30秒

public void updateTopicRouteInfoFromNameServer() {
    //1 需要更新路由信息的Topic集合
    Set<String> topicList = new HashSet<String>();
    //2 添加消费者需要使用到的Topic到集合中
    // Consumer
    {
        Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MQConsumerInner> entry = it.next();
            MQConsumerInner impl = entry.getValue();
            if (impl != null) {
                Set<SubscriptionData> subList = impl.subscriptions();
                if (subList != null) {
                    for (SubscriptionData subData : subList) {
                        topicList.add(subData.getTopic());
                    }
                }
            }
        }
    }
    //3 添加生产者需要使用到的topic到集合中
    // Producer
    {
        Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MQProducerInner> entry = it.next();
            MQProducerInner impl = entry.getValue();
            if (impl != null) {
                Set<String> lst = impl.getPublishTopicList();
                topicList.addAll(lst);
            }
        }
    }
    //4 逐一从NameServer更新每个Topic的路由信息
    for (String topic : topicList) {
        this.updateTopicRouteInfoFromNameServer(topic);
    }
}

这里我们可以很明确的看到,MQClientInstance从【producer】和【consumer】获取到需要的topicList,然后根据topic从【NameServer】中更新【broker】。

客户端NameServer选择策略

上面我们说到了RocketMQ的【client】是如何维护【broker】的,那么【client】是通过【NameServer】去更新【broker】的,【NameServer】是个集群并且节点间是不通信的,那么【client】又是从哪个【NameServer】节点获取数据的呢?

这里我们以【producer】为例,在【producer】发送消息时,调用到MQClientAPIImpl#sendMessageSync方法

private SendResult sendMessageSync(
    final String addr,
    final String brokerName,
    final Message msg,
    final long timeoutMillis,
    final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
    RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
    assert response != null;
    return this.processSendResponse(brokerName, msg, response);
}

NettyRemotingClient#getAndCreateNameserverChannel

private Channel getAndCreateNameserverChannel() throws RemotingConnectException, InterruptedException {
    String addr = this.namesrvAddrChoosed.get();
    if (addr != null) {
        ChannelWrapper cw = this.channelTables.get(addr);
        if (cw != null && cw.isOK()) {
            return cw.getChannel();
        }
    }

    final List<String> addrList = this.namesrvAddrList.get();
    if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
        try {
            addr = this.namesrvAddrChoosed.get();
            // 一旦已经选中一个,就会一直使用这个NameServer
            // 直到在更新NameServer信息时如果该NameServer不在集群内,会将namesrvAddrChoosed删除
            if (addr != null) {
                ChannelWrapper cw = this.channelTables.get(addr);
                if (cw != null && cw.isOK()) {
                    return cw.getChannel();
                }
            }

            if (addrList != null && !addrList.isEmpty()) {
                // 使用round robin策略
                for (int i = 0; i < addrList.size(); i++) {
                    int index = this.namesrvIndex.incrementAndGet();
                    index = Math.abs(index);
                    index = index % addrList.size();
                    String newAddr = addrList.get(index);

                    this.namesrvAddrChoosed.set(newAddr);
                    log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
                    Channel channelNew = this.createChannel(newAddr);
                    if (channelNew != null) {
                        return channelNew;
                    }
                }
                throw new RemotingConnectException(addrList.toString());
            }
        } finally {
            this.lockNamesrvChannel.unlock();
        }
    } else {
        log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
    }

    return null;
}

从代码中我们可以看到,【client】使⽤round-robin策略选择连接的【NameServer】。但是在选择了⼀个【NameServer】节点之后,后⾯总是会优先选择这个【NameServer】,除⾮与这个【NameServer】节点从【NameServer】集群中清除的情况下,才会继续round-robin

为什么客户端不与所有【NameServer】节点建⽴连接呢,⽽是只选择其中⼀个?通常NameServer节点是固定的⼏个,但是客户端的数量可能是成百上千,为了减少每个【NameServer】节点的压⼒,所以每个客户端节点只随机与其中⼀个【NameServer】节点建⽴连接。

为了尽可能保证【NameServer】集群每个节点的负载均衡,在round-robin策略选择时,每个客户端的初始随机位置都不同,如下:

private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex());

private static int initValueIndex() {
    Random r = new Random();

    return Math.abs(r.nextInt() % 999) % 999;
}

总结

相对来说,【NameServer】的稳定性⾮常⾼。原因有两个:

  1. 【NameServer】互相独⽴,彼此没有通信关系,单台【NameServer】挂掉,不影响其他节点。
  2. 【NameServer】不会有频繁的读写,所以性能开销⾮常⼩,稳定性很⾼。

Broker

存储并转发消息

【Broker】是RocketMQ的核⼼,消息存储是【Broker】的核⼼,提供了消息的接收、存储、拉取等功能,⼀般都需要保证【Broker】的⾼可⽤,所以会配置【Broker Slave】,当【Master】挂掉之后,【Consumer】仍然可以消费【Slave】。

Broker主从

Queue

RocketMQ都是磁盘消息队列的模式,在集群模式下,对于同⼀个消费组,⼀个分区只⽀持⼀个消费者来消费消息。

过少的分区,会导致消费速度落后于消息的⽣产速度,导致消息堆积,所以在实际⽣产环境中,⼀个【Topic】会设置成多分区的模式,来⽀持多个消费者,提供消费消息,进⾏并发消费。参照下图:
topic多分区
在RocketMQ中,这个分区就是Message Queue,⽤于并⾏发送和接收消息。

将【Topic】分⽚再切分为若⼲等分,其中的⼀份就是⼀个【Queue】。每个【Topic】分⽚分配的【Queue】的数量可以不同,由⽤户在创建【Topic】时指定。

从这里我们可以看到,【Queue】是在分片的基础上建立的,数据分⽚的主要⽬的是突破单点的资源(⽹络带宽,CPU,内存或⽂件存储)限制从⽽实现⽔平扩展。RocketMQ在进⾏【Topic】分⽚以后,已经达到⽔平扩展的⽬的了,为什么还需要进⼀步切分为【Queue】呢?

解答这个问题还需要从负载均衡说起。以消息消费为例,借⽤RocketMQ官⽅⽂档中的【Consumer】负载均衡示意图来说明:
队列来历
如图所示,TOPIC_A在⼀个【Broker】分片上有5个【Queue】,⼀个【Consumer Group】内有2个【Consumer】按照集群消费的⽅式消费消息,按照平均分配策略进⾏负载均衡得到的结果是:第⼀个【Consumer】消费3个【Queue】,第⼆个【Consumer】消费2个【Queue】。如果增加【Consumer】,每个【Consumer】分配到的【Queue】会相应减少。

在RocketMQ中,⼀个【Topic】可以分布在各个【Broker】上,我们可以把⼀个【Topic】分布在⼀个【Broker】上的⼦集定义为⼀个【Topic】分⽚。在生产环境中【Broker】集群节点数量通常不会太多,那么简单的通过分片来作为分区基本单元粒度还是比较大,所以RocketMQ引入了【Queue】:
分片和Queue关系

Queue数量指定⽅式
  1. 代码指定
    producer.setDefaultTopicQueueNums(8);
    
  2. 【Broker】配置⽂件指定
    【Broker】服务器的配置⽂件broker.properties
    defaultTopicQueueNums=16
    
  3. mqadmin命令
    mqadmin命令
    其中-r-w建议一致。
  4. rocket-console控制台指定
    mq-console队列数量配置

Producer

消息发送方

【Producer】与【NameServer】保持长链接,每个30s获取最新的路由信息,同时在在发送第一条消息的时候通过路由信息与【Broker】建立长链接。

【Producer】发送消息时候,是否应该知道应该把消息发送到哪个【Broker】服务器中的队列中进⾏存储?

【Producer】根本不知道把消息发送到哪个【Borker】服务器。

Producer Group

⽣产者组,简单来说就是多个发送同⼀类消息的⽣产者称之为⼀个⽣产者组。⼀个⽣产者组,代表着⼀群【Topic】相同的【Producer】。

Consumer

消息接收方

【Consumer】与【NameServer】保持⻓连接,每隔30s将会从【NameServer】服务器中查询【Topic】路由信息,查询到路由信息信息后,就会根据【ip映射⽂件】从【Broker】中消费消息。

【Consumer】获取路由信息后,会在本地进⾏缓存。【Consumer】和broker保持⻓连接,每隔30s向【Broker】发送⼼跳检测,检测【Broker】是否处于活跃状态。

Consumer Group

消费者组,和⽣产者类似,消费同⼀类消息的多个【Consumer】实例组成⼀个消费者组。⼀个消费者组,代表着⼀群订阅相同【Topic】并且【Tag】也相同(即逻辑相同)的【Consumer】集合。通过⼀个消费者组,可容易的进⾏负载均衡以及容错。
消费者组
从图中可以看到,集群模式下,同一个消费者组内的每个消费者订阅同一个【Topic】的不同的队列,不同消费者组的消费状态相互独立。

Topic

Topic是指一个消息的种类,比如一些消息都与订单相关,那么我们可以创建一个订单的【Order_Topic】,将这一类消息的【Topic】设置为该【Order_Topic】。

⼀个【Producer】可以发送一个或者多个【Topic】的消息;⼀个【Consumer】可以订阅⼀个或者多个【Topic】消息。
如下图:
Topic作用
当然一个消费者也可以同时订阅多个【Topic】,比如说上图中猴子也可以订阅【鱼Topic】。

Tag

标签可以被认为是对【Topic】进⼀步细化。⼀般在相同业务模块中通过引⼊标签来标记不同⽤途的消息。

【Tag】表示消息的第⼆级类型,⽐如交易消息⼜可以分为

  • 交易创建消息
  • 交易完成消息
    ...
    rocketMQ标签

Topic和Tag的使用

  1. 消息类型是否⼀致:如普通消息,事务消息,定时消息,顺序消息,不同的消息类型使⽤不同的【Topic】,⽆法通过【Tag】进⾏区分。
  2. 业务是否相关联:没有直接关联的消息,如淘宝交易消息,京东物流消息使⽤不同的【Topic】进⾏区分;⽽同样是天猫交易消息,电器类订单、⼥装类订单、化妆品类订单的消息可以⽤【Tag】进⾏区分。
  3. 消息优先级是否⼀致:如同样是物流消息,盒⻢必须⼩时内送达,天猫超市24⼩时内送达,淘宝物流则相对会会慢⼀些,不同优先级的消息⽤不同的【Topic】进⾏区分。
  4. 消息量级是否相当:有些业务消息虽然量⼩但是实时性要求⾼,如果跟某些万亿量级的消息使⽤同⼀个【Topic】,则有可能会因为过⻓的等待时间⽽『饿死』,此时需要将不同量级的消息进⾏拆分,使⽤不同的【Topic】。

Message

⼀个【Message】必须指定【Topic】。⽤户在发送时可以设置message key,便于之后查询和跟踪。【Message】还有⼀个可选的【Tag】设置,以便消费端可以基于【Tag】进⾏过滤消息。也可以添加额外的键值对,例如你需要⼀个业务【key】来查找【Broker】上的消息,⽅便在开发过程中诊断问题。

可以从Message的属性看到一个消息的构成:

// 主题
private String topic;
private int flag;
// 属性 message key 设置好之后其实也是一条属性
private Map<String, String> properties;
// 消息内容
private byte[] body;
// 事务Id 只有事务消息有用
private String transactionId;

Offset

在RocketMQ中有两个【Offset】,一个是CommitLog Offset,一个是ConsumeQueue Offset

ConsumeQueue Offset

ConsumeQueue Offset是指某个【Topic】下的⼀条消息在某个【Message Queue】⾥的位置,通过ConsumeQueue Offset的值可以定位到这条消息,或者指示【Consumer】从这条消息开始向后继续处理。

ConsumeQueue Offset分为本地⽂件类型和【Broker】代存的类型两种。
ConsumeQueue Offset代码结构

RocketMQ集群有两种消费模式:

  • 集群模式(Clustering)
    也就是同⼀个【Consumer Group】⾥的多个消费者每⼈消费⼀部分,各⾃收到的消息内容不⼀样。这种情况下,由【Broker】端存储和控制ConsumeQueue Offset的值,使⽤RemoteBrokerOffsetStore结构。
  • 广播模式
    每个【Consumer】都收到这个【Topic】的全部消息,各个【Consumer】间相互没有⼲扰,RocketMQ使⽤LocalfileOffsetStore,把ConsumeQueue Offset存到本地。
ConsumeQueue Offset配置

ConsumeFromWhere枚举类表示设置ConsumeQueue Offset的位置。

这个类有三个类型(实际是6个,其他3个已过期)

  • CONSUME_FROM_LAST_OFFSET
  • CONSUME_FROM_FIRST_OFFSET
  • CONSUME_FROM_TIMESTAMP

默认是CONSUME_FROM_FIRST_OFFSET,也就是从现有最小的【Offset】开始读取,如果从队列开始到需要读取的消息之间有很⼤的范围,⽤CONSUME_FROM_FIRST_OFFSET参数就不合适了,可以设置从某个时间开始消费消息,⽐如:

Consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
// 时间戳格式是精确到秒的
Consumer.setConsumeTimestamp("20200723171201");

注意设置读取位置不是每次都有效,它的优先级默认在【Offset Store】后⾯,⽐如在DefaultMQPushConsumer使用BROADCASTING模式下,默认是从LocalfileOffsetStore⾥读取某个【Topic】对应【ConsumerGroup】的【Offset】,当读取不到【Offset】的时候,ConsumeFromWhere的设置才⽣效。
⼤部分情况下这个设置在【ConsumerGroup】初次启动时有效。如果【Consumer】正常运⾏后被停⽌,然后再启动,会接着上次的【Offset】开始消费,ConsumeFromWhere的设置无效。

⽣产环境上⼀般使⽤集群模式,主要记录集群模式下【Offset】的管理,即RemoteBrokerOffsetStore,这种模式下,【Offset】被记录在【Broker】中,【ConsumeOffset】的是以json的形式持久化到磁盘⽂件中,⽂件路径为${user.home}/store/config/consumerOffset.json。其内容示例如下:

{
  "offsetTable": {
    "test-topic@test-group": {
      "0": 88526,
      "1": 88528
    }
  }
}

从文件格式上可以看出,key为topic@consumerGroup,value对应的是当前【Broker】中的队列和它的【Offset】值组合。

CommitLog Offset

在RocketMQ消息类型中我们说到过【Consume Queue】中记录的内容:
consumeQueue存储单元

从图中可以看到,在【ConsumeQueue】中记录的就是Commit Log Offset,然后再根据Commit Log OffsetSize从【Commit Log】中找到相应的数据返回给【Consumer】。

网络模型

RocketMQ使⽤Netty框架实现⾼性能的⽹络传输。

RocketMQ的核心业务都是在【Broker】内完成,这里我们网络模型主要针对【Broker】。
RocketMQ对【Broker】的线程池进⾏了精细的隔离。使得消息的⽣产、消费、客户端⼼跳、客户端注册等请求不会互相⼲扰。如下是各个业务执⾏线程池和【Broker】处理的报⽂类型的对应关系,从下图中我们也可以看出【Broker】的核⼼功能点。
Broker各个线程池作用

RocketMQ原理

消息存储

分布式队列因为有⾼可靠性的要求,所以数据要进⾏持久化存储。
消息持久化

存储方式

⽬前业界较为常⽤的⼏款产品(RocketMQ/Kafka/RabbitMQ)均采⽤的是消息刷盘⾄所部署的虚拟机/物理机的⽂件系统来做持久化(刷盘⼀般可以分为异步刷盘和同步刷盘两种模式)。

消息刷盘为消息存储提供了⼀种⾼效率、⾼可靠性和⾼性能的持久化⽅式。除⾮部署MQ机器本身或是本地硬盘挂了,否则⼀般是不会出现⽆法持久化的问题。

RocketMQ采用顺序存储的方式进行消息持久化,磁盘如果使⽤得当,磁盘的速度完全可以匹配⽹络的数据传输速度。⽬前的⾼性能磁盘,⽐如SSD,顺序写速度可以达到600MBs,超过了⼀般⽹卡的传输速度。但是磁盘随机写的速度只有⼤概100kb/s,和顺序写的性能相差6000倍!因为有如此巨⼤的速度差别,好的消息队列系统会⽐普通的消息队列系统速度快多个数量级,RocketMQ的消息⽤顺序写保证了消息存储的速度。

与关系型数据库对比

目前有很多中间件采用关系型数据库作为数据持久化方案,比如Nacos使用MySQL作为持久化。

为什么RocketMQ没有使用关系型数据库作为持久化媒介呢?
这里主要从性能方面考虑,采用顺序刷盘的方式比数据落到关系型数据库中要快很多,而且使用关系型数据库的优势在于查找消息的方便,而目前RocketMQ对于消息的索引只应用在时间和MessageId层面,这些利用文件索引都可以实现,所以RocketMQ选择了直接刷盘的方式进行持久化。

存储结构

消息的存储是由consumequeuecommitlog配合完成的。

  • commitlog:消息真正的物理存储⽂件(MappedFile)。
  • consumequeue:是消息的逻辑队列,类似数据库的索引⽂件,存储的是指向物理存储的地址。
ConsumeQueue

【Topic】下的每个队列都有对应的【ConsumeQueue】⽂件,内容也会被持久化到磁盘。

默认地址:${store}/consumequeue/{topicName}/{queueid}/fileNameBroker
从文件地址可以看出⼀个【Topic】有多个队列,每个队列对应一个队列ID。

CommitLog

消息⽂件的存储地址:${store}/commitlog

在RocketMQ中使用MappedFile作为持久化文件对象,每个文件大小为1G

MappedFile⽂件⽣成有⼀定的规则:

  • 名字⻓度为20位,左边补零,剩余为起始偏移量;
  • ⽂件的命名是以1G(1024 1024 1024 = 1073741824)进⾏递增,第⼀个⽂件是00000000000000000000
  • 消息存储的时候会顺序写⼊⽂件,当⽂件满了则写⼊下⼀个⽂件。也就是说第⼆个⽂件名字为000000001073741824,第三个名字是以00000000002147483648进⾏命名,依次递增。体现在代码中就是以上⼀个⽂件的偏移量加上1073741824即可。
public class MessageStoreConfig {
    // CommitLog file size,default is 1G
    private int mappedFileSizeCommitLog = 1024 * 1024 * 1024;
     ...
}

public class MappedFileQueue {
    public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
        ...
        if (mappedFileLast != null && mappedFileLast.isFull()) {
            //计算⽂件名称的逻辑
            createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
        }
        ...
    }
}
CommitLog和ConsumeQueue存储关系

rocketMQ消息存储结构

rocketMQ消息存储结构2
如上图所示,【producer】发送消息到【broker】之后,会将消息具体内容持久化到【commitLog】⽂件(MappedFile)中,再分发到【topic】下的消费队列【ConsumeQueue】,消费者提交消费请求时,【broker】从该【consumer】订阅的队列中根据请求的【offset】得到待消费的消息在【CommitLog】中的位置和消息大小,同时也可以通过【Tag】判断是否需要消费该消息,在确认需要消费该消息后从【CommitLog】中获取具体的消息内容返回给【consumer】。在这个过程中,【consumer】提交的【offset】为本次请求的起始消费位置,即beginOffset;【ConsumeQueue】中的内容保存了【CommitLog】中具体消息的位置和大小。

【ConsumeQueue】中每个消息索引信息⻓度为20字节:

  • 8字节的CommitLog Offset,记录该条消息在【CommitLog】中的位置
  • 4字节的Size,记录该条消息的大小
  • 8字节的Tag Hash Code,记录消息的【Tag】的哈希值(订阅时如果指定【Tag】,会根据HashCode快速查找订阅的消息)

consumeQueue存储单元

IndexFile

除了【CommitLog】和【ConsumeQueue】之外,RocketMQ还保存了【IndexFile】用于查找消息。

索引文件

【IndexFile】(索引⽂件)提供了⼀种可以通过key或时间区间来查询消息的⽅法。

Index⽂件的存储位置是:$HOME /store/index/${fileName},⽂件名fileName是以创建时的时间戳命名的,固定的单个【IndexFile】⽂件⼤⼩约为400M,⼀个【IndexFile】可以保存2KW个索引,【IndexFile】的底层存储设计为在⽂件系统中实现HashMap结构,故rocketmq的索引⽂件其底层实现为hash索引。

消息清理

消息队列作为消息中间件,虽然提供了消息持久化功能,但是消息不能无休止的保存在消息队列中,RocketMQ中就有两种消息清理机制:

  1. 按时间清理,RocketMQ默认会清理3天前的【CommitLog】⽂件。
  2. 按磁盘⽔位清理,当磁盘使⽤量到达磁盘容量75%,开始清理最⽼的【CommitLog】⽂件。

刷盘机制

上面说到了【CommitLog】是存储到硬盘上的,这样既能保证断掉后恢复,⼜可以让存储的消息超出内存的限制。RocketMQ为了提⾼性能,会尽可能地保证磁盘的顺序写。

【CommitLog】数据落盘时,有两种写磁盘⽅式:分布式同步刷盘和异步刷盘。
commitLog数据落盘方式

同步刷盘

在返回写成功状态时,消息已经被写⼊磁盘。具体流程是,消息写⼊内存的PAGECACHE后,⽴刻通过之刷盘线程刷盘,然后等待刷盘完成。刷盘线程执⾏完成后唤醒等待的线程,返回消息写成功的状态。

配置(broker.conf):

flushDiskType=SYNC_FLUSH

优点:
可靠性⾼、安全性⾼

异步刷盘

在返回写成功状态时,消息可能只是被写⼊了内存的PAGECACHE,写操作的返回快,吞吐量达;当内存⾥的消息量积累到⼀定程度是,统⼀触发写磁盘动作,快速写⼊(由操作系统确定写入时机)。

配置(broker.conf):

flushDiskType=ASYNC_FLUSH

优点:
效率性能⾼,⾼吞吐量

高可用

架构的高可用

RocketMQ分布式集群是通过Master和Slave的配合达到⾼可⽤性的。

Master和Slave的区别:在【Broker】的配置⽂件中,参数brokerId的值为0表明这个【Broker】是Master,⼤于0表明这个【Broker】是Slave,同时brokerRole参数也会说明这个【Broker】是Master还是Slave。

Master⻆⾊的【Broker】⽀持读和写,Slave⻆⾊的【Broker】仅⽀持读,也就是【Producer】只能和Master⻆
⾊的【Broker】连接写⼊消息;【Consumer】可以连接Master⻆⾊的【Broker】,也可以连接Slave⻆⾊的【Broker】来读取消息。

消息发送⾼可⽤

在创建【Topic】的时候,把【Topic】的多个队列创建在多个【Broker】组上(相同【Broker】名称,不同【Broker】Id的机器组成⼀个【Broker】组),这样当⼀个【Broker】组的Master不可⽤后,其他组的Master仍然可⽤,【Producer】仍然可⽤发送消息。

RocketMQ⽬前还不⽀持把Slave⾃动转成Master,如果机器资源不⾜,需要把Slave转成Master,则需要⼿动停⽌Slave⻆⾊的【Broker】,更改配置⽂件,⽤的新配置⽂件启动【Broker】。

消息发送队列负载默认采⽤轮询机制,消息发送时默认选择重试机制来保证消息发送的⾼可⽤。当【Broker】宕机后,虽然消息发送者⽆法第⼀时间感知【Broker】宕机,但是当消息发送者向【Broker】发送消息返回异常后,⽣产者会在接下来⼀定时间内,例如5分钟内不会再次选择该【Broker】上的队列,这样就规避了发⽣故障的【Broker】,结合重试机制,巧妙实现消息发送的⾼可⽤。

消息消费⾼可⽤

在【Consumer】的配置⽂件中,并不需要设置是从master读还是从slave读,当master不可⽤或者繁忙的时候,【Consumer】会被⾃动切换到slave读。有了⾃动切换【Consumer】这种机制,当⼀个master⻆⾊的机器出现故障后,【Consumer】仍然可以从slave读取消息,不影响【Consumer】程序,这样就达到消费端的⾼可⽤性。

消息主从复制

如果⼀个【Broker】组有Master和Slave,消息需要从Master复制到Slave上,有同步和异步两种复制⽅式。

同步复制

同步复制⽅式是等Master和Slave均写成功后才反馈给客户端写成功状态,在同步复制⽅式下,如果Master出故障,Slave上有全部的备份数据,容易恢复,但是同步复制会增⼤数据写⼊延迟,降低系统吞吐量。

异步复制

异步复制是只要Master写成功即可反馈给客户端写成功状态,同时异步线程将消息同步给Slave,在异步复制⽅式下,系统拥有较低的延迟和较⾼的吞吐量,但是如果Master出了故障,有些数据因
为没有被写⼊Slave,有可能会丢失。

刷盘与主从复制结合

实际应⽤场景中要结合业务场景,合理设置刷盘⽅式和主从复制⽅式,尤其是SYNC_FLUSH⽅式,由于频繁地触发磁盘写操作,会明显降低性能。

通常情况下,把master和Slave配置成ASYNC_FLUSH的刷盘⽅式,主从之间配置成SYNC_MASTER的复制⽅式,这样即使有⼀台机器出故障,仍然能保证数据不丢,是个不错的选择。

消息投递机制

RocketMQ消息发送及消费只有⼀种模式:topic主题模式。
topic主题模式

RocketMQ的消息投递分分为两种:

  1. 【Producer】往【Broker】中投递。
  2. 【Broker】往【Consumer】投递(这种投递的说法是从消息传递的⻆度阐述的,实际上底层是【Consumer】从【Broker】中拉取消息)。

RocketMQ的消息模型整体并不复杂,如下图所示:
消息投递模型
⼀个【Topic】可能对应多个实际的消息队列(MessgeQueue)在底层实现上,为了提⾼MQ的可⽤性和灵活性,⼀个【Topic】在实际存储的过程中,采⽤了多队列的⽅式,具体形式如上图所示。每个消息队列在使⽤中应当保证先⼊先出(FIFO,First In First Out)的⽅式进⾏消费。

那么,基于这种模型,就会引申出两个问题:

  1. ⽣产者在发送相同【Topic】的消息时,消息体应当被放置到哪⼀个消息队列(MessageQueue)中?
  2. 消费者在消费消息时,应当从哪些消息队列中拉取消息?

⽣产者(Producer)投递消息的策略

默认投递⽅式:基于Queue队列轮询算法投递

默认情况下,采⽤了最简单的轮询算法,这种算法有个很好的特性就是,保证每⼀个【Queue】的消息投递数量尽可能均匀,【Producer】端,每个实例在发消息的时候,默认会轮询所有的【Queue】发送,以达到让消息平均落在在不同的【Queue】上,⽽由于【Queue】可以散落在不同的【Queue】,所以消息就发送到不同的【Broker】上,如下图:
producer投递消息到broker
图中箭头线条上的标号代表顺序,【Producer】会把第⼀条消息发送⾄Queue 0,然后第⼆条消息发送⾄Queue 1,依次类推。

默认投递⽅式的增强:基于Queue队列轮询算法和消息投递延迟最⼩的策略投递

默认的投递⽅式⽐较简单,但是也暴露了⼀个问题,就是有些【Queue】可能由于⾃身数量积压等原因,可能在投递的过程⽐较⻓,对于这样的【Queue】会影响后续投递的效果。

基于这种现象,RocketMQ在每发送⼀个MQ消息后,都会统计⼀下消息投递的时间延迟,根据这个时间延迟,可以知道往哪些【Queue】投递的速度快。

在这种场景下,会优先使⽤消息投递延迟最⼩的策略,如果没有⽣效,再使⽤【Queue】轮询的⽅式。

顺序消息的投递⽅式

上述两种投递⽅式属于对消息投递的时序性没有要求的场景,这种投递的速度和效率⽐较⾼。⽽在有些场景下,需要保证同类型消息投递和消费的顺序性。

例如,假设现在有TOPIC TOPIC_SALE_ORDER,该【Topic】下有4个【Queue】,该【Topic】⽤于传递订单的状态变迁,假设订单有状态:未⽀付、已⽀付、发货中(处理中)、发货成功、发货失败

在时序上,⽣产者从时序上可以⽣成如下⼏个消息:

订单T0000001:未⽀付 --> 订单T0000001:已⽀付 --> 订单T0000001:发货中(处理中) --> 订单T0000001:发货失败

消息发送到MQ中之后,可能由于轮询投递的原因,消息在MQ的存储可能如下:
生产者投递消息无序问题
这种情况下,我们希望消费者消费消息的顺序和我们发送是⼀致的,然⽽,有上述MQ的投递和消费机制,我们⽆法保证顺序是正确的,对于顺序异常的消息,消费者即使有⼀定的状态容错,也不能完全处理好这么多种随机出现组合情况。

基于上述的情况,RockeMQ采⽤了这种实现⽅案:对于相同订单号的消息,通过⼀定的策略,将其放置在⼀个【Queue】中,然后消费者再采⽤⼀定的策略(⼀个线程独⽴处理⼀个【Queue】,保证处理消息的顺序性),能够保证消费的顺序性。
顺序消息实现原理
我们先看⽣产者是如何能将相同订单号的消息发送到同⼀个【Queue】的:
⽣产者在消息投递的过程中,使⽤了MessageQueueSelector作为队列选择的策略接⼝,其定义如下:

public interface MessageQueueSelector {
    MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}

相应地,⽬前RocketMQ提供了如下⼏种实现:
MessageQueueSelector继承体系

投递策略 策略实现类 说明
随机分配策略 SelectMessageQueueByRandom 使⽤了简单的随机数选择算法
基于Hash分配策略 SelectMessageQueueByHash 根据附加参数的Hash值,按照消息队列列表的⼤⼩取余数,得到消息队列的index
基于机器机房位置分配策略 SelectMessageQueueByMachineRoom 开源的版本没有具体的实现,基本的⽬的应该是机器的就近原则分配

像上面三种明显无法达到要求,那么就需要开发人员实现MessageQueueSelector然后按照规则让同一个体的消息到达同一个【Queue】。

new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer id = (Integer) arg;
        int index = id % mqs.size();
        return mqs.get(index);
    }
}

消费者分配队列策略

RocketMQ对于消费者消费消息有两种形式:

  • BROADCASTING:⼴播式消费,这种模式下,每一个消费者都会连接到订阅的【Topic】的所有队列,这样的话,每⼀个消费者都可以接收到所有的消息,【Offset】由【Consumer】本地维护。
  • CLUSTERING:集群式消费,这种模式下,同一个【ConsumerGroup】下的每一个【Consumer】会被分配到这个【Topic】下的一些队列(分配策略下面会详细讲),这样的话每个【Consumer】只能消费到它订阅的队列上的消息,【Offset】由【Broker】维护。
广播模式

⼴播模式下要求⼀条消息需要投递到⼀个消费组下⾯所有的消费者实例。
在实现上,就是在【consumer】分配【queue】的时候,会所有【consumer】都分到所有的【queue】。
广播消费模式

集群模式

使⽤了消费模式为MessageModel.CLUSTERING进⾏消费时,需要保证⼀个消息在整个集群中只需要被消费⼀次。实际上,在RoketMQ底层,消息指定分配给消费者的实现,是通过【queue】分配给消费者的⽅式完成的,也就是说,消息分配的单位是消息所在的【queue】。

在集群消费模式下,每条消息只需要投递到订阅这个【topic】的【ConsumerGroup】下的⼀个实例即可。RocketMQ采⽤主动拉取的⽅式拉取并消费消息,在拉取的时候需要明确指定拉取哪⼀条【queue】。

RocketMQ定义了策略接⼝AllocateMessageQueueStrategy,对于给定的消费者分组、消息队列列表、消费者列表,当前消费者应当被分配到哪些【queue】,定义如下:

public interface AllocateMessageQueueStrategy {

    /**
     * 给ID为currentCID的consumer分配Queue
     *
     * @param consumerGroup 当前消费者组
     * @param currentCID 当前消费者ID
     * @param mqAll 该topic下的所有队列
     * @param cidAll 当前消费者组内的所有消费者ID
     * @return 分配结果
     */
    List<MessageQueue> allocate(
        final String consumerGroup,
        final String currentCID,
        final List<MessageQueue> mqAll,
        final List<String> cidAll
    );

    /**
     * 策略名称
     */
    String getName();
}

RocketMQ自带了6种分配策略:
算法名称 | 含义
--- | ---
AllocateMessageQueueAveragely | 平均分配算法
AllocateMessageQueueAveragelyByCircle | 基于环形平均分配算法
AllocateMachineRoomNearby | 基于机房临近原则算法
AllocateMessageQueueByMachineRoom | 基于机房分配算法
AllocateMessageQueueConsistentHash | 基于⼀致性hash算法
AllocateMessageQueueByConfig | 基于配置分配算法

为了讲述清楚上述算法的基本原理,我们先假设⼀个例⼦,下⾯所有的算法将基于这个例⼦讲解(这里不考虑【Queue】在哪个【Broker】)。
consumer分配队列示例

AllocateMessageQueueAveragely-平均分配算法

这⾥所谓的平均分配算法,并不是指的严格意义上的完全平均,如上⾯的例⼦中,10个【queue】,⽽消费者只有4个,⽆法整除,除了整除之外的多出来的【queue】,将依次根据消费者的顺序均摊。

按照上述例⼦来看,10/4=2,即表示每个消费者平均均摊2个【queue】;⽽10%4=2,即除了均摊之外,多出来2个【queue】还没有分配,那么,根据消费者的顺序consumer-1、consumer-2、consumer-3、consumer-4,则多出来的2个【queue】将分别给consumer-1consumer-2
consumer分配队列的平均分配策略

public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
    private final InternalLogger log = ClientLogger.getLog();

    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
        if (currentCID == null || currentCID.length() < 1) {
            throw new IllegalArgumentException("currentCID is empty");
        }
        if (mqAll == null || mqAll.isEmpty()) {
            throw new IllegalArgumentException("mqAll is null or mqAll empty");
        }
        if (cidAll == null || cidAll.isEmpty()) {
            throw new IllegalArgumentException("cidAll is null or cidAll empty");
        }

        List<MessageQueue> result = new ArrayList<MessageQueue>();
        if (!cidAll.contains(currentCID)) {
            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
                consumerGroup,
                currentCID,
                cidAll);
            return result;
        }

        // 当前consumer在所有consumer中的索引
        int index = cidAll.indexOf(currentCID);
        // queue和consumer的余数
        int mod = mqAll.size() % cidAll.size();
        // 每个consumer平均可以拥有几个queue
        // 此处逻辑:
        // 1. 如果queue数量小于consumer数量,那么每个consumer只能拿到一个queue,且有的consumer拿不到
        // 2. queue数量大于consumer且mod=0时,就是每个consumer可以拿到相同数量的queue 比如:0-10分给consumer1;11-20分给consumer2...
        // 3. queue数量大于consumer且mod<>0时,两种情况:1.index<mod,就会多分1个;2.相反就会少分1个
        int averageSize =
            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                + 1 : mqAll.size() / cidAll.size());
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }

    @Override
    public String getName() {
        return "AVG";
    }
}
AllocateMessageQueueAveragelyByCircle-基于环形平均算法

环形平均算法,是指根据消费者的顺序,依次在由【queue】组成的环形图中逐个分配。具体流程如下所示:

其最终分配到每个【consumer】的队列个数与平均分配算法相同,区别在于:

  • 平均分配算法是在分配前先计算好每个【consumer】需要分配的队列个数,然后依次给每个【consumer】分配,比如上例中的分配是计算好consumer-1分配3个,然后把queue#0、queue#1、queue#2分配给consumer-1...
  • 基于环形平均算法是以队列为基础,顺序取出队列然后将这个队列顺序的分配给【consumer】。

基于环形的平均分配

public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy {
    private final InternalLogger log = ClientLogger.getLog();

    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
                                       List<String> cidAll) {
        if (currentCID == null || currentCID.length() < 1) {
            throw new IllegalArgumentException("currentCID is empty");
        }
        if (mqAll == null || mqAll.isEmpty()) {
            throw new IllegalArgumentException("mqAll is null or mqAll empty");
        }
        if (cidAll == null || cidAll.isEmpty()) {
            throw new IllegalArgumentException("cidAll is null or cidAll empty");
        }

        List<MessageQueue> result = new ArrayList<MessageQueue>();
        if (!cidAll.contains(currentCID)) {
            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
                    consumerGroup,
                    currentCID,
                    cidAll);
            return result;
        }
        // 当前consumer在所有consumer中的索引
        int index = cidAll.indexOf(currentCID);
        // 这里实现形式和AllocateMessageQueueAveragely类似,区别在于分配的queue的编号
        // avg模式下,分配queue是按批分配,比如:1-5号queue分配给consumer1...
        // 这里,分配queue是滚动式分配,比如:1、6、11、16、21号queue分配给consumer1...
        for (int i = index; i < mqAll.size(); i++) {
            if (i % cidAll.size() == index) {
                result.add(mqAll.get(i));
            }
        }
        return result;
    }

    @Override
    public String getName() {
        return "AVG_BY_CIRCLE";
    }
}
AllocateMachineRoomNearby-基于机房临近原则算法

该算法使⽤了装饰模式,对分配策略进⾏了增强。⼀般在⽣产环境,如果是微服务架构下,RocketMQ集群的部署可能是在不同的机房中部署,其基本结构可能如下图所示:
机房临近原则分配策略

对于跨机房的场景,会存在⽹络、稳定性和隔离性的原因,该算法会根据【queue】的部署机房位置和【consumer】的位置,过滤出当前【consumer】相同机房的【queue】队列,然后再结合实际应用的分配策略分配。

public class AllocateMachineRoomNearby implements AllocateMessageQueueStrategy {
    private final InternalLogger log = ClientLogger.getLog();

    private final AllocateMessageQueueStrategy allocateMessageQueueStrategy;//actual allocate strategy
    private final MachineRoomResolver machineRoomResolver;

    public AllocateMachineRoomNearby(AllocateMessageQueueStrategy allocateMessageQueueStrategy,
        MachineRoomResolver machineRoomResolver) throws NullPointerException {
        if (allocateMessageQueueStrategy == null) {
            throw new NullPointerException("allocateMessageQueueStrategy is null");
        }

        if (machineRoomResolver == null) {
            throw new NullPointerException("machineRoomResolver is null");
        }

        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
        this.machineRoomResolver = machineRoomResolver;
    }

    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
        if (currentCID == null || currentCID.length() < 1) {
            throw new IllegalArgumentException("currentCID is empty");
        }
        if (mqAll == null || mqAll.isEmpty()) {
            throw new IllegalArgumentException("mqAll is null or mqAll empty");
        }
        if (cidAll == null || cidAll.isEmpty()) {
            throw new IllegalArgumentException("cidAll is null or cidAll empty");
        }

        List<MessageQueue> result = new ArrayList<MessageQueue>();
        if (!cidAll.contains(currentCID)) {
            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
                consumerGroup,
                currentCID,
                cidAll);
            return result;
        }

        // 按照机房把MessageQueue分组
        Map<String/*machine room */, List<MessageQueue>> mr2Mq = new TreeMap<String, List<MessageQueue>>();
        for (MessageQueue mq : mqAll) {
            String brokerMachineRoom = machineRoomResolver.brokerDeployIn(mq);
            if (StringUtils.isNoneEmpty(brokerMachineRoom)) {
                if (mr2Mq.get(brokerMachineRoom) == null) {
                    mr2Mq.put(brokerMachineRoom, new ArrayList<MessageQueue>());
                }
                mr2Mq.get(brokerMachineRoom).add(mq);
            } else {
                throw new IllegalArgumentException("Machine room is null for mq " + mq);
            }
        }

        // 按照机房把consumer分组
        Map<String/*machine room */, List<String/*clientId*/>> mr2c = new TreeMap<String, List<String>>();
        for (String cid : cidAll) {
            String consumerMachineRoom = machineRoomResolver.consumerDeployIn(cid);
            if (StringUtils.isNoneEmpty(consumerMachineRoom)) {
                if (mr2c.get(consumerMachineRoom) == null) {
                    mr2c.put(consumerMachineRoom, new ArrayList<String>());
                }
                mr2c.get(consumerMachineRoom).add(cid);
            } else {
                throw new IllegalArgumentException("Machine room is null for consumer id " + cid);
            }
        }

        List<MessageQueue> allocateResults = new ArrayList<MessageQueue>();

        // 找到当前consumer所在的机房
        // 1.allocate the mq that deploy in the same machine room with the current consumer
        String currentMachineRoom = machineRoomResolver.consumerDeployIn(currentCID);
        // 列出当前机房部署的broker(MessageQueue)和consumer
        List<MessageQueue> mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom);
        List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom);
        // (装饰模式)在当前机房内再使用内嵌的strategy(平均、环状平均、一致性hash...)处理
        if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) {
            allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom));
        }

        // 如果broker所在的机房没有部署consumer(没有当前机房),就按照内嵌的strategy处理
        // 2.allocate the rest mq to each machine room if there are no consumer alive in that machine room
        for (String machineRoom : mr2Mq.keySet()) {
            if (!mr2c.containsKey(machineRoom)) { // no alive consumer in the corresponding machine room, so all consumers share these queues
                allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mr2Mq.get(machineRoom), cidAll));
            }
        }

        return allocateResults;
    }

    @Override
    public String getName() {
        return "MACHINE_ROOM_NEARBY" + "-" + allocateMessageQueueStrategy.getName();
    }

    /**
     * A resolver object to determine which machine room do the message queues or clients are deployed in.
     *
     * AllocateMachineRoomNearby will use the results to group the message queues and clients by machine room.
     *
     * The result returned from the implemented method CANNOT be null.
     */
    public interface MachineRoomResolver {
        String brokerDeployIn(MessageQueue messageQueue);

        String consumerDeployIn(String clientID);
    }
}
AllocateMessageQueueByMachineRoom-基于机房分配算法

该算法适⽤于属于同⼀个机房内部的消息,去分配【queue】。这种⽅式⾮常明确,对比机房临近分配算法的场景,这种更彻底,直接指定某些【queue】分配给某个机房的【consumer】。这种⽅式具有强约定性,⽐如【broker】名称按照机房的名称进⾏拼接,在算法中通过约定解析进⾏分配。

public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy {
    private Set<String> consumeridcs;

    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
        List<MessageQueue> result = new ArrayList<MessageQueue>();
        // 按照命名规则,指定某些queue会分配到该机房的consumer
        int currentIndex = cidAll.indexOf(currentCID);
        if (currentIndex < 0) {
            return result;
        }
        List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
        for (MessageQueue mq : mqAll) {
            // 指定queue规则
            String[] temp = mq.getBrokerName().split("@");
            if (temp.length == 2 && consumeridcs.contains(temp[0])) {
                premqAll.add(mq);
            }
        }

        int mod = premqAll.size() / cidAll.size();
        int rem = premqAll.size() % cidAll.size();
        int startIndex = mod * currentIndex;
        int endIndex = startIndex + mod;
        for (int i = startIndex; i < endIndex; i++) {
            result.add(mqAll.get(i));
        }
        if (rem > currentIndex) {
            result.add(premqAll.get(currentIndex + mod * cidAll.size()));
        }
        return result;
    }

    @Override
    public String getName() {
        return "MACHINE_ROOM";
    }

    public Set<String> getConsumeridcs() {
        return consumeridcs;
    }

    public void setConsumeridcs(Set<String> consumeridcs) {
        this.consumeridcs = consumeridcs;
    }
}
AllocateMessageQueueConsistentHash-基于⼀致性hash算法

使⽤这种算法,会将【consumer】作为Node节点构造成⼀个hash环,然后【queue】队列通过这个hash环来决定被分配给哪个【consumer】消费者。

其实现流程如下图:
consumer一致性hash分配队列

  1. 构造一个0-2^32-1数字构成的hash环,最小值是最大值的下一个元素。
  2. 将每个【consumer】按照hash算法计算后插入到环中。
  3. 按照同样的hash算法计算每个队列,然后找到这个hash值后面的第一个【consumer】,将这个队列分配给这个【consumer】。

一致性hash算法的优势在于,每次新增一个【consumer】后需要重新分配的队列数量尽可能少。

AllocateMessageQueueByConfig–基于配置分配算法

这种算法单纯基于配置的,⾮常简单,实际使⽤中可能⽤途不⼤。

总结

默认情况下,消费者使⽤的是AllocateMessageQueueAveragely算法,也可以⾃⼰指定:

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
    ...
    public DefaultMQPushConsumer() {
        this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely());
    }
    public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
        AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
        this.consumerGroup = consumerGroup;
        this.namespace = namespace;
        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
        defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
    }
    ...
}

消费方式

在RocketMQ⾥,【consumer】被分为2类:

  • MQPullConsumer
  • MQPushConsumer

其实本质都是拉⽅式pull,即【consumer】轮询从【broker】拉取消息。RocketMQ的push模式是基于pull模式实现的,它没有实现真正的push

push模式

push⽅式⾥,【consumer】把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener#consumeMessage()来消费,对⽤户⽽⾔,感觉消息是被推送过来的。

pull模式

pull⽅式⾥,取消息的过程需要⽤户⾃⼰写,⾸先通过打算消费的【Topic】拿到【MessageQueue】的集合,遍历【MessageQueue】集合,然后针对每个【MessageQueue】批量取消息,⼀次取完后,记录该队列下⼀次要取的起始【offset】,直到取完了,再换另⼀个【MessageQueue】。

区别

从下⾯这张简单的示意图也可以⼤致看出其中的差别:
push模式和pull模式区别
这里不是RocketMQ的实现
push的⽅式是:消息发送到broker后,则【broker】会主动把消息推送给【consumer】。
pull的⽅式是:消息投递到broker后,消费端需要主动去【broker】上拉消息,即需要⼿动写代码实现。

两种⽅式的优缺点对⽐:

  • push模式实时性⾼,但增加服务端负载,消费端能⼒不同,如果push的速度过快,消费端会出现很多问题,没有考虑客户端的消费能⼒。
  • pull模式消费者从server端拉消息,主动权在消费端,可控性好,但是时间间隔不好设置,间隔太短,则空请求会多,浪费资源,间隔太⻓,则消息不能及时处理。
RocketMQ实现

RocektMQ并没有使⽤推模式或者拉模式,⽽是使⽤了结合两者优点的⻓轮询机制,它本质上还是拉模式,但服务端能够通过hold住请求的⽅式减少客户端对服务端的频繁访问,从⽽提⾼资源利⽤率及消息响应实时性。

长轮训机制的实现:

  1. 【consumer】发送拉取消息请求
  2. 【broker】hold住请求,直到有新消息再返回
  3. 如果超过请求超时时间(默认30s),即认为请求超时,【consumer】再次发起请求

消息消费重试

顺序消息的重试

对于顺序消息,当消费者消费消息失败后,RocketMQ会⾃动不断地进⾏消息重试(每次间隔时间为1秒)。这时,应⽤会出现消息消费被阻塞的情况。因此,建议您使⽤顺序消息时,务必保证应⽤能够及时监控并处理消费失败的情况,避免阻塞现象的发⽣。

其他消息重试

触发重试条件

对于其他无序的消息(普通、定时、延时、事务等),当消息消费失败后,可以通过返回状态达到消息重试的结果。

  1. 广播消费模式下,消息消费不会进行重试。
  2. 集群消费模式下,消息消费失败后期望消息重试,需要在消息监听器接⼝的实现中明确进⾏配置(三种⽅式任选⼀种):
    • 消费端返回ConsumeConcurrentlyStatus.RECONSUME_LATER(推荐)
    • 消费端抛出异常
    • 返回为空
      consumer.registerMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        doConsumeMessage(message);
        //⽅式 1:返回RECONSUME_LATER,消息将重试
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        //⽅式 2:返回null,消息将重试
        return null;
        //⽅式 3:直接抛出异常,消息将重试
        throw new RuntimeException("Consumer Message exception");
      }
      });
      
重试次数

RocketMQ默认允许每条消息最多重试【16】次,每次重试的间隔时间如下:

第⼏次重试 与上次重试的间隔时间 第⼏次重试 与上次重试的间隔时间
1 10秒 9 7分钟
2 30秒 10 8分钟
3 1分钟 11 9分钟
4 2分钟 12 10分钟
5 3分钟 13 20分钟
6 4分钟 14 30分钟
7 5分钟 15 1⼩时
8 6分钟 16 2⼩时

如果消息重试16次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在⼀直消费失败的前提下,将会在接下来的4⼩时46分钟之内进⾏16次重试,超过这个时间范围消息将不再重试。

⾃定义消息最⼤重试次数

RocketMQ允许【Consumer】启动的时候设置最⼤重试次数,重试时间间隔将按照以下策略:

  • 最⼤重试次数⼩于等于【16】次,则重试时间间隔按照默认时间表。
  • 最⼤重试次数⼤于【16】次,超过【16】次的重试时间间隔均为每次【2⼩时】。

配置⽅式如下:

DefaultMQPushConsumer consumer = newDefaultMQPushConsumer("please_rename_unique_group_name");
//配置对应 Group ID 的最⼤消息重试次数为 20 次
consumer.setMaxReconsumeTimes(20);

注意:

  • 如果只对相同Group ID下两个【Consumer】实例中的其中⼀个设置了MaxReconsumeTimes,那么该配置对两个【Consumer】实例均⽣效。
  • 配置采⽤覆盖的⽅式⽣效,即最后启动的【Consumer】实例会覆盖之前的启动实例的配置。
消费消息时获取重试次数

消费者收到消息后,可从消息中获取消息的重试次数:

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    for(MessageExt msg:msgs){
        // 获取重试次数
        System.out.println(msg.getReconsumeTimes());
    }
    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
实现原理

RocketMQ会为每个消费组都设置一个【Topic】名称为%RETRY%+consumerGroup的重试队列(这里需要注意的是,这个【Topic】的重试队列是针对消费组,而不是针对每个【Topic】设置的),用于暂时保存因为各种异常而导致【Consumer】端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。

RocketMQ对于重试消息的处理是先保存至【Topic】名称为SCHEDULE_TOPIC_XXXX的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至%RETRY%+consumerGroup的重试队列中。

从这里我们可以看到,消息重试间隔实际上是使用了延时消息,这也就解释了为什么消息消费重试间隔时间与延时消息的间隔时间是相同的。

消息重投

生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway没有任何保证。消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在RocketMQ中是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、【consumer】负载变化也会导致重复消息。如下方法可以设置消息重投策略:

  • retryTimesWhenSendFailed:同步发送失败重投次数,默认为2,因此生产者会最多尝试发送retryTimesWhenSendFailed + 1次。不会选择上次失败的【broker】,尝试向其他【broker】发送,最大程度保证消息不丢。超过重投次数,抛出异常,由客户端保证消息不丢。当出现RemotingException、MQClientException和部分MQBrokerException时会重投。
  • retryTimesWhenSendAsyncFailed:异步发送失败重试次数,异步重试不会选择其他【broker】,仅在同一个【broker】上做重试,不保证消息不丢。
  • retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或slave不可用(返回状态非SEND_OK),是否尝试发送到其他【broker】,默认false。十分重要消息可以开启。

流量控制

生产者流控,因为【broker】处理能力达到瓶颈;消费者流控,因为消费能力达到瓶颈。

生产者流控

  • 【CommitLog】文件被锁时间超过osPageCacheBusyTimeOutMills时,参数默认为1000ms,返回流控。
  • 如果开启transientStorePoolEnable==true,且【broker】为异步刷盘的主机,且transientStorePool中资源不足,拒绝当前send请求,返回流控。
  • 【broker】每隔10ms检查send请求队列头部请求的等待时间,如果超过waitTimeMillsInSendQueue,默认200ms,拒绝当前send请求,返回流控。
  • 【broker】通过拒绝send 请求方式实现流量控制。

这里需要注意的是:生产者流控,不会尝试消息重投。

消费者流控

  • 消费者本地缓存消息数超过pullThresholdForQueue时,默认1000。
  • 消费者本地缓存消息大小超过pullThresholdSizeForQueue时,默认100MB。
  • 消费者本地缓存消息跨度超过consumeConcurrentlyMaxSpan时,默认2000。

消费者流控的结果是降低拉取频率。

死信队列

当消费者消费失败达到最⼤重试次数(默认16次),消息还是消费失败,RocketMQ不会将该消息丢弃⽽是会把它保存到死信队列中。

这种不能被消费者正常处理的消息我们⼀般称之为死信消息(Dead-Letter Message),将存储死信消息的队列称之为死信队列(Dead-Letter Queue,DLQ)。

死信队列特征

死信Topic的命名为:%DLQ% + Consumer组名,如:%DLQ%online-tst

⾸先看下死信消息具备的特点:

  • 死信队列中的消息不会再被消费者正常消费,也就是⼀般情况下DLQ是消费者不可⻅的
  • 死信存储有效期与正常消息相同,均为3天,3天后会被⾃动删除。因此,我们要保证在死信消息产⽣后的3天内对其进⾏及时处理

死信队列则具有以下特性:

  • 每个死信队列对应⼀个Group ID,也就是每个消费者组都有⼀个死信队列,⽽不是对应单个消费者实例
  • 如果⼀个Group ID未产⽣死信消息,RocketMQ不会为其创建相应的死信队列
  • ⼀个死信队列包含了对应Group ID下产⽣的所有死信消息,不论该消息属于哪个【Topic】,也就是对于某个消费者组,它的所有的死信共享⼀个死信队列

处理死信消息

实际上,当⼀条消息进⼊死信队列,就意味着某些因素导致消费者⽆法正常消费该消息(⽐如,代码中存在bug/数据库宕机等)。

因此,对于死信消息,通常需要开发进⾏特殊处理。最关键的步骤是要排查可疑因素并解决代码中存在的bug。

  • 控制台重新发送该消息,让消费者对该消息重新消费⼀次。
  • 除了通过console⼿动推送消息进⾏消费,我们也可以查询死信中消息,将消息重新投递到原【topic】进⾏重新消费。

消息幂等

为了防⽌消息重复消费导致业务处理异常,RocketMQ的消费者在接收到消息后,有必要根据业务上的唯⼀Key对消息做幂等处理。

概念

如果有⼀个操作,多次执⾏与⼀次执⾏所产⽣的影响是相同的,我们就称这个操作是幂等的。

当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费⼀次的结果是相同的,并且多次消费并未对业务系统产⽣任何负⾯影响,那么这整个过程就可实现消息幂等。

场景

在互联⽹应⽤中,尤其在⽹络不稳定的情况下,RocketMQ的消息有可能会出现重复。所以有必要在使用RocketMQ过程中确保消息幂等性。

发送消息时消息重复

发送消息时消息重复

当⼀条消息已被成功发送到服务端并完成持久化,此时出现了⽹络闪断或者客户端宕机,导致服务端对客户端应答失败。如果此时⽣产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且消息的UniqueKey也相同的消息。

消费消息时消息重复

消费消息时消息重复

消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候⽹络闪断。为了保证消息⾄少被消费⼀次,RocketMQ的服务端将在⽹络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且消息的MessageOffsetIdUniqueKey都相同的消息。

rebalance导致的消息重复

当RocketMQ的【Broker】或客户端重启、扩容或缩容时,会触发rebalance,此时消费者可能会收到重复消息。

这里会在后面rebalance中详细说明。

消息幂等实现

幂等令牌

幂等令牌是⽣产者和消费者两者中的既定协议,在业务中通常是具备唯⼀业务标识的字符串,如:订单号、流⽔号等。且⼀般由⽣产者端⽣成并传递给消费者端。

处理唯⼀性的确保
  1. 业务操作之前进⾏状态查询
    消费端开始执⾏业务操作时,通过幂等令牌⾸先进⾏业务状态的查询,如:修改订单状态环节,当订单状态为成功/失败则不需要再进⾏处理。那么我们只需要在消费逻辑执⾏之前通过订单号进⾏订单状态查询,⼀旦获取到确定的订单状态则对消息进⾏提交,通知【broker】消息状态为:ConsumeConcurrentlyStatus.CONSUME_SUCCESS
  2. 唯⼀性约束保证最后⼀道防线
    RocketMQ不能保证⼀定不出现重复的数据,如:并发插⼊的场景下,如果没有乐观锁、分布式锁作为保证的前提下,很有可能出现数据的重复插⼊操作,因此我们务必要对幂等令牌添加唯⼀性索引,这样就能够保证在并发场景下也能保证数据的唯⼀性。
  3. 引⼊锁机制
    如果是并发更新的情况,没有使⽤悲观锁、乐观锁、分布式锁等机制的前提下,进⾏更新,很可能会出现多次更新导致状态的不准确。如:对订单状态的更新,业务要求订单只能从初始化->处理中,处理中->成功,处理中->失败,不允许跨状态更新。如果没有锁机制,很可能会将初始化的订单更新为成功,成功订单更新为失败等异常的情况。
    ⾼并发下,建议通过状态机的⽅式定义好业务状态的变迁,通过乐观锁、分布式锁机制保证多次更新的结果是确定的,悲观锁在并发环境不利于业务吞吐量的提⾼因此不建议使⽤。

RocketMQ结合业务处理消息幂等

RocketMQ能够保证消息不丢失但不保证消息不重复。

如果在RocketMQ中实现消息去重实际也是可以的,但是考虑到⾼可⽤以及⾼性能的需求,如果做了服务端的消息去重,RocketMQ就需要对消息做额外的rehash、排序等操作,这会花费较⼤的时间和空间等资源代价,收益并不明显。
RocketMQ考虑到正常情况下出现重复消息的概率其实是很⼩的,因此RocketMQ将消息幂等操作交给了业务⽅处理。

因为消息的UniqueKey有可能出现冲突(重复)的情况,因此不建议通过UniqueKey作为处理依据,⽽最好的⽅式是以业务唯⼀标识作为幂等处理的关键依据如:订单号、流⽔号等作为幂等处理的关键依据。⽽业务的唯⼀标识可以通过消息Key设置。
以⽀付场景为例,可以将消息的Key设置为订单号,作为幂等处理的依据。具体代码示例如下:

Message message = new Message();
message.setKeys("ORDERID_100");
SendResult sendResult = producer.send(message);

消费者收到消息时可以根据消息的Key,即订单号来实现消息幂等:

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    for(MessageExt msg:msgs){
        String key = msg.getKeys();
        // 根据业务唯⼀标识的 Key 做幂等处理
        // 结合上面说到的通过锁机制、唯一性约束、业务处理前先查询状态等方法解决幂等问题
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

消息堆积

消息堆积本质

⽣产者的⽣产速度 >> 消费者的处理速度

  1. 由于生产者生产速度的增加导致生产速度比消费速度大
  2. 消费速度变慢,⽐如消费者实例IO阻塞严重或者宕机

消息堆积本质

处理消息堆积

处理消息堆积主要从两个方面入手:

  1. 如何通过解决系统问题、优化代码来避免消息堆积
  2. 已经发生了堆积时,怎么解决
发送端性能优化

从消息堆积若⼲原因来看,消息堆积的原因主要在消费端处理上,本身⽣产者端应该遵循的原则应该是尽可能快的将消息发送到【Broker】中去,因此发送端除了业务处理时批量发送暂⽆好的⼿段优化,⽽且并不是所有的业务处理都⽀持批量发送和批量接收处理。

消费端性能优化

在设计系统的时候,⼀定要保证消费端的消费性能要⾼于⽣产端的发送性能,这样的系统才能健康的持续运⾏。

所以针对消息堆积问题消费端主要从两个方面:

  1. 增加单个消费者处理能⼒,尽可能的优化消息处理业务逻辑的能
    ⼒,减少不必要的⾮业务相关处理时间消耗。
    消费端增加单点处理能力
  2. 如果单点处理能力已经很难再有提升的情况下,也可以通过⽔平扩容,增加消费端的并发数来提升总体的消费性能。(这里消费者数量不超过队列数量)
如何快速处理

如果消息已经堆积了,线上如何快速处理。对于系统发⽣消息积压的情况,需要先解决积压,再分析原因,毕竟保证系统的可⽤性是⾸先要解决的问题。

  1. 消费端扩容,这是基本的快速解决方式。
  2. 服务降级,快速失败,不⼀定适⽤所有业务场景。
  3. 跳过⾮重要消息,发⽣消息堆积时,如果消费速度⼀直追不上发送速度,可以选择丢弃不重要的消息。
  4. 异常监控,属于运维层⾯措施。

消息查询

RocketMQ提供了三种消息查询方式:

  1. Message Key查询:消息的key是业务开发在发送消息之前⾃⾏指定的,通常会把具有业务含义,区分度⾼的字段作为消息的key,如⽤户id,订单id等。
  2. Unique Key查询:除了业务开发明确的指定消息中的key,RocketMQ⽣产者客户端在发送发送消息之前,会⾃动⽣成⼀个UNIQ_KEY,设置到消息的属性中,从逻辑上唯⼀代表⼀条消息(因为消息重投机制,可能会有多个相同Unique Key的消息)。
  3. Message Id查询:Message Id是消息发送后,在【Broker】端⽣成的,其包含了【Broker】的地址,和在【CommitLog】中的偏移信息,并会将Message Id作为发送结果的⼀部分进⾏返回。Message Id属于精确匹配,可以唯⼀定位⼀条消息,不需要使⽤哈希索引机制,查询效率更⾼。

RocketMQ有意弱化Unique KeyMessage Id的区别,对外都称之为Message Id。在通过RocketMQ的命令⾏⼯具或管理平台进⾏查询时,⼆者可以通⽤。在根据Unique Key进⾏查询时,本身是有可能查询到多条消息的,但是查询⼯具会进⾏过滤,只会返回⼀条消息。

在日常业务开发过程中,在发送/消费消息时,将这些信息记录下来,通常是记录到⽇志⽂件中,以便在出现问题时进⾏排查。

// 1. 构建消息对象Message
Message msg = new Message();
msg.setTopic("TopicA");
msg.setKeys("Key1");
msg.setBody("message body".getBytes());
try{
    // 2. 发送消息
    SendResult result = producer.send(msg);
    // 3. 打印发送结果
    log.info(result);
} catch (Exception e){
    e.printStackTrace();
}

事实上,⽤户主动设置的Key以及客户端⾃动⽣成的Unique Key,最终都会设置到Message对象的properties属性中,如下图所示:
rocketMQ发送消息过程中属性

  • KEYS:表示⽤户通过setKeys⽅法设置的消息key,
  • UNIQ_KEY:表示客户端⾃动⽣成的Unique Key。

在发送结果返回对象SendResult中包含了Unique KeyMessage Id,如下所示:

SendResult [sendStatus=SEND_OK, msgId=0A1427544F4818B4AAC27DD168880000, offsetMsgId=0A14275400002A9F00000000001F268E, messageQueue=MessageQueue[topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=1173]
  • sendStatus:表示消息发送结果的状态
  • msgId:注意这⾥的命名虽然是msgId,但实际上其是Unique Key
  • offsetMsgId:【Broker】返回的Message ID。在后⽂中,未进⾏特殊说明的情况下,Message ID总是表示offsetMsgId
    messageQueue:消息发送到了哪个的队列。
    queueOffset:消息在队列中的偏移量,每次发送到⼀个队列时,offset+1。

消息查询⼯具

  • 命令⾏⼯具
  • 管理平台
  • 客户端API
命令⾏⼯具
$ sh bin/mqadmin
The most commonly used mqadmin commands are:
...
 queryMsgById 按照Message Id查询消息
 queryMsgByKey 按照Key查询消息
 queryMsgByUniqueKey 按照UNIQ_KEY查询消息
...

例如,要查询在TopicA中,key为Key-0的消息:

➜ rocketmq-all-4.7.1-bin-release bin/mqadmin queryMsgByKey -k key-0 -t TopicTest -n localhost:9876
#Message ID                         #QID         #Offset
0A1427544F20135FBAA47DCC015C0000     1             1154
0A1427544F4818B4AAC27DD168880000     2             1173

这⾥,我们看到输出结果中包含了2条记录。其中:

  • Message ID:这⾥这⼀列的名字显示有问题,实际上其代表的是Unique Key
  • QID:表示队列的ID,注意在RocketMQ中定位⼀个队列需要topic+brokerName+queueId。这⾥只显示了queueId,其实并不能知道在哪个Broker上。
  • Offset:消息在在队列中的偏移量

在查询到Unique Key之后,我们就可以使⽤另外⼀个命令:queryMsgByUniqueKey,来查询消息的具体内容。

➜ rocketmq-all-4.7.1-bin-release bin/mqadmin queryMsgByUniqueKey -i
0A1427544F4818B4AAC27DD168880000 -t TopicTest -n localhost:9876
Topic:                 TopicTest
Tags:                 [TagA]
Keys:                 [key-0]
Queue ID:             2
Queue Offset:         1173
CommitLog Offset:     2041486
Reconsume Times:     0
Born Timestamp:     2020-07-25 10:21:15,785
Store Timestamp:     2020-07-25 10:21:15,792
Born Host:             10.20.39.84:55082
Store Host:         10.20.39.84:10911
System Flag:         0
Properties:         {KEYS=key-0, UNIQ_KEY=0A1427544F4818B4AAC27DD168880000, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}
Message Body Path:     /tmp/rocketmq/msgbodys/0A1427544F4818B4AAC27DD168880000

对于消息体的内容,会存储到Message Body Path字段指定到的路径中。可通过cat命令查看(仅适⽤于消息体是字符串):

➜ rocketmq-all-4.7.1-bin-release cat
/tmp/rocketmq/msgbodys/0A1427544F4818B4AAC27DD168880000
Hello RocketMQ 0%
指定消费者消费

queryMsgByUniqueKey⼦命令还接收另外两个参数:

  • -g 参数⽤于指定消费者组名称
  • -d 参数指定消费者client id

指定了这两个参数之后,消息将由消费者直接消费,⽽不是打印在控制台上。

⾸先,通过consumerStatus命令,查询出消费者组下的client id信息,如:

➜ rocketmq-all-4.7.1-bin-release bin/mqadmin consumerStatus -g please_rename_unique_group_name -n localhost:9876
001        10.20.39.84@20820        V4_3_0        1595644649539/10.20.39.84@20820

Same subscription in the same group of consumer

Rebalance OK

这⾥显示了消费者组please_rename_unique_group_name下⾯只有⼀个消费者,client id为10.20.39.84@20820。
接着我们可以在queryMsgByUniqueKey⼦命令中,添加-g和-d参数,如下所示:

➜ rocketmq-all-4.7.1-bin-release bin/mqadmin queryMsgByUniqueKey -g please_rename_unique_group_name -d 10.20.39.84@20820 -t TopicTest -i 0A1427544F4818B4AAC27DD168880000 -n localhost:9876

ConsumeMessageDirectlyResult [order=false, autoCommit=true, consumeResult=CR_SUCCESS, remark=null, spentTimeMills=0]%

可以看到,这⾥并没有打印出消息内容,取⽽代之的是消息消费的结果。

在内部,主要是分为3个步骤来完成让指定消费者来消费这条消息,如下图所示:
命令行指定consumer消费一条消息

  1. 命令⾏⼯具给所有【Broker】发起QUERY_MESSAGE请求查询消息,因为并不知道UNIQ_KEY这条消息在哪个【Broker】上,且最多只会返回⼀条消息,如果超过1条其他会过滤掉;如果查询不到就直接报错。
  2. 根据消息中包含了Store Host信息,也就是消息存储在哪个【Broker】上,接来下命令⾏⼯具会直接给这个【Broker】发起CONSUME_MESSAGE_DIRECTLY请求,这个请求会携带msgId,group和client id的信息。
  3. 【Broker】接收到这个请求,查询出消息内容后,主动给消费者发送CONSUME_MESSAGE_DIRECTLY通知请求,注意虽然与第2步使⽤了同⼀个请求码,但不同的是这个请求中包含了消息体的内容,消费者可直接处理。注意:这⾥并不是将消息重新发送到【Topic】中,否则订阅这个【Topic】的所有消费者组,都会重新消费这条消息。
管理平台
根据Topic时间范围查询

按【Topic】查询属于范围查询,不推荐使⽤,因为时间范围内消息很多,不具备区分度。查询时,尽可能设置最为精确的时间区间,以便缩⼩查询范围,提⾼速度。最多返回2000条数据。
rocketMQ-console根据topic和时间查询消息

根据Message Key查询

Message Key查询属于模糊查询,仅适⽤于没有记录Message ID但是设置了具有区分度的Message Key的情况。⽬前,根据Message Key查询,有⼀个很⼤局限性:不能指定时间范围,且最多返回64条数据。如果⽤户指定的key重复率⽐较⾼的话,就有可能搜不到。
rocketMQ-console

根据Message Id查询

Message ID查询属于精确查询,速度快,精确匹配,只会返回⼀条结果,推荐使⽤。这里的Message Id既可以是Unique Key也可以是Message Offset Id,如果是Unique Key并且有多条数据时,会进行过滤。
根据MessageId查询

客户端API查询

除了通过命令⾏⼯具和管理平台,还可以通过客户端API的⽅式来进⾏查询,这其实是最本质的⽅式,命令⾏⼯具和管理平台的查询功能都是基于此实现。

常⽤的DefaultMQProducer、DefaultMQPushConsumer等都实现了org.apache.rocketmq.client.MQAdmin接口,其定义了如下几个方法:

public interface MQAdmin {
    ...
    // msgId参数:仅接收SendResult中的offsetMsgId,返回单条消息
    MessageExt viewMessage(final String msgId);
    // msgId参数:传⼊SendResult中的offsetMsgId、uniqueKey都可以,返回单条消息
    MessageExt viewMessage(String topic, String msgId);
    // 在指定topic下,根据key进⾏查询,并指定最⼤返回条数,以及开始和结束时间
    QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,final long end);
    ...
}

可以看到都是通过MQAdmin接口实现的消息查询功能。

先来看两个重载方法viewMessage(final String msgId)viewMessage(String topic, String msgId)

public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
    return this.mqClientInstance.getMQAdminImpl().viewMessage(msgId);
}

public MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
    try {
        MessageDecoder.decodeMessageId(msgId);
        return this.viewMessage(msgId);
    } catch (Exception e) {
        log.warn("the msgId maybe created by new client. msgId={}", msgId, e);
    }
    return this.mqClientInstance.getMQAdminImpl().queryMessageByUniqKey(topic, msgId);
}

从中可以看到viewMessage(String topic, String msgId)方法的实现其实底层还是使用了viewMessage(final String msgId),该方法其实就是将offsetMsgId解析成消息所在的【broker】信息和【CommitLog Offset】。
MQAdminImpl#viewMessage

public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
    //1 根据msgId解码成MessageId对象
    MessageId messageId = null;
    try {
        messageId = MessageDecoder.decodeMessageId(msgId);
    } catch (Exception e) {
        throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by id finished, but no message.");
    }
    //2 根据MessageId中的Broker地址和commit log offset信息进行查询
    return this.mQClientFactory.getMQClientAPIImpl().viewMessage(RemotingUtil.socketAddress2String(messageId.getAddress()),
        messageId.getOffset(), timeoutMillis);
}

如果无法解析则认为msgId参数是Unique Key,再按照topic + uniqueKey的组合查询。前⾯提到,Unique Key只是从逻辑上代表⼀条消息,实际上在【Broker】端可能存储了多条,因此在当做Unique Key进⾏查询时,会进⾏过滤,只取其中⼀条。源码如下所示:
MQAdminImpl#queryMessageByUniqKey

public MessageExt queryMessageByUniqKey(String topic, String uniqKey) throws InterruptedException, MQClientException {
    // 根据uniqKey进行查询
    QueryResult qr = this.queryMessage(topic, uniqKey, 32,
        MessageClientIDSetter.getNearlyTimeFromID(uniqKey).getTime() - 1000, Long.MAX_VALUE, true);
    // 对查询结果进行过滤,最多只取一条
    if (qr != null && qr.getMessageList() != null && qr.getMessageList().size() > 0) {
        return qr.getMessageList().get(0);
    } else {
        return null;
    }
}

最后再来看queryMessage方法,通过key来查询消息,从代码跟踪上来看最后和queryMessageByUniqKey相同都是走到了MQAdminImpl#queryMessage方法,通过【Broker】上的索引查询消息。

MessageId、UniqueKey和MessageKey

Unqiue KeyMessage Key都需要都是在消息发送时写入到消息属性中的,通过它们利⽤RocketMQ的哈希索引机制来完成消息查询,Message Id是在【Broker】端⽣成的,其包含了【Broker】地址和commit Log offset信息,可以精确匹配⼀条消息,查询消息更好。

Unique Key

Unique Key是⽣产者发送消息之前,由RocketMQ客户端⾃动⽣成的。
DefaultMQProducerImpl#sendKernelImpl

private SendResult sendKernelImpl(final Message msg,
    final MessageQueue mq,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final TopicPublishInfo topicPublishInfo,
    final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    ...
            // 如果不是批量消息,则生成Unique Key
            if (!(msg instanceof MessageBatch)) {
                // 设置唯一编号
                MessageClientIDSetter.setUniqID(msg);
            }
    ...
}

MessageClientIDSetter#setUniqID

public static void setUniqID(final Message msg) {
    // unique key为空时才会设置
    if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
        msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, createUniqID());
    }
}

从上面的代码中我们看到如果是批量消息就不会设置UniqueKey,那么批量消息是怎样的呢?

private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
    MessageBatch msgBatch;
    try {

        //1 将消息集合转换为MessageBatch
        msgBatch = MessageBatch.generateFromList(msgs);
        for (Message message : msgBatch) {
            //2 迭代每个消息,逐一设置Unique Key
            Validators.checkMessage(message, this);
            MessageClientIDSetter.setUniqID(message);
            message.setTopic(withNamespace(message.getTopic()));
        }
        //3 设置批量消息的消息体
        msgBatch.setBody(msgBatch.encode());
    } catch (Exception e) {
        throw new MQClientException("Failed to initiate the MessageBatch", e);
    }
    msgBatch.setTopic(withNamespace(msgBatch.getTopic()));
    return msgBatch;
}

可以看到,对于批量消息是在send的方法之前,执行DefaultMQProducer#batch方法,在这里逐一设置UniqueKey

作用

了解Unique Key的作⽤对于我们理解消息重复的原因有很⼤的帮助。RocketMQ并不保证消息投递过程中的Exactly Once语义,即消息只会被精确消费⼀次,需要消费者⾃⼰做幂等。⽽通常导致消息重复消费的原因,主要包括:

  • ⽣产者发送时消息重复
  • 消费者Rebalance时消息重复

其中,导致⽣产者发送重复消息的原因可能是:⼀条消息已被成功发送到服务端并完成持久化,由于⽹络超时此时出现了⽹络闪断或者客户端宕机,导致服务端对客户端应答失败,此时⽣产者将再次尝试发送消息。
在重试发送时,sendKernelImpl会被重复调⽤,意味着setUniqID⽅法会被重复调⽤,不过由于setUniqID⽅法实现中进⾏判空处理,不会重复设置。在这种情况下,消费者后续会收到两条内容相同并且Unique Key也相同的消息(offsetMsgId不同,因为对【Broker】来说存储了多次)。
生产者发送消息重复

Message Id

在RocketMQ源码中Message Id是以属性offsetMsgId存在的。Message Id是在【Broker】端⽣成的,⽤于唯⼀标识⼀条消息,在根据Message Id查询的情况下,最多只能查询到⼀条消息。
Message Id构成

public class MessageId {
    private SocketAddress address;
    private long offset;

    public MessageId(SocketAddress address, long offset) {
        this.address = address;
        this.offset = offset;
    }

    // setter & getter

}

其创建和解析都在MessageDecoder类中完成:

/**
 * 创建MessageId
 */
public static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) {
    input.flip();
    int msgIDLength = addr.limit() == 8 ? 16 : 28;
    input.limit(msgIDLength);
    //broker地址
    input.put(addr);
    //CommitLog偏移量
    input.putLong(offset);

    return UtilAll.bytes2string(input.array());
}

/**
 * 解析MessageId
 */
public static MessageId decodeMessageId(final String msgId) throws UnknownHostException {
    SocketAddress address;
    long offset;
    int ipLength = msgId.length() == 32 ? 4 * 2 : 16 * 2;

    byte[] ip = UtilAll.string2bytes(msgId.substring(0, ipLength));
    byte[] port = UtilAll.string2bytes(msgId.substring(ipLength, ipLength + 8));
    ByteBuffer bb = ByteBuffer.wrap(port);
    int portInt = bb.getInt(0);
    address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt);

    // offset
    byte[] data = UtilAll.string2bytes(msgId.substring(ipLength + 8, ipLength + 8 + 16));
    bb = ByteBuffer.wrap(data);
    offset = bb.getLong(0);

    return new MessageId(address, offset);
}
生成Message Id时机

【Broker】端在顺序存储消息时,会通过createMessageId⽅法创建Message Id
CommitLog.DefaultAppendMessageCallback#doAppend

public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
                                    final MessageExtBrokerInner msgInner) {
    ...                                
    // 3. 创建msgId
    if ((sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
        msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
    } else {
        msgId = MessageDecoder.createMessageId(this.msgIdV6Memory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
    }
    ...
}
解析Message Id时机

⽽客户端在根据Message Id向【Broker】查询消息时,⾸先会将通过MessageDecoderdecodeMessageId⽅法,之后直接向这个【Broker】进⾏查询指定位置的消息。
MQAdminImpl#viewMessage

public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
    //1 根据msgId解码成MessageId对象
    MessageId messageId = null;
    try {
        messageId = MessageDecoder.decodeMessageId(msgId);
    } catch (Exception e) {
        throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by id finished, but no message.");
    }
    //2 根据MessageId中的Broker地址和commit log offset信息进行查询
    return this.mQClientFactory.getMQClientAPIImpl().viewMessage(RemotingUtil.socketAddress2String(messageId.getAddress()),
        messageId.getOffset(), timeoutMillis);
}

Rebalance

Rebalance(再均衡)机制指的是:将⼀个【Topic】下的多个队列(或称之为分区),在同⼀个消费者组(consumer group)下的多个消费者实例(consumer)之间进⾏重新分配。

Rebalance机制本意是为了提升消息的并⾏处理能⼒。

触发rebalance时机

  1. 队列信息变化
    • broker宕机
    • broker升级等运维操作
    • 队列扩容/缩容
  2. 消费者组信息变化
    • ⽇常发布过程中的停⽌与启动
    • 消费者异常宕机
    • ⽹络异常导致消费者与【Broker】断开连接
    • 主动进⾏消费者数量扩容/缩容
    • 【Topic】订阅信息发⽣变化

rebalance会带来哪些问题

  1. 消费暂停
    在队列重新分配的过程中因为消费者不会订阅任何的队列,所以此时消息消费会暂停等到队列分配到具体的消费者后继续消费
  2. 消费突增
    与消费暂停实际是相关的,因为rebalance期间消费暂停会导致消息的堆积,等rebalance结束之后会有一段时间的消息消费高峰期
  3. 重复消费
    rebalance导致消息消费重复问题
    如上图,【consumer1】消费到【offset】为10的消息,其中,8和9的消息【consumer1】已经获取并消费,此时还没有将消息消费状态回执到【Broker】上就发生了rebalance,重分配结束之后,队列分配给了【consumer2】,这样的话会从【offset】为8的消息开始消费。

rebalance实现

队列信息和消费者组信息称之为rebalance元数据。

【Broker】维护rebalance元数据,并在⼆者信息发⽣变化时,以间接通知的方式告知消费者组下所有实例进⾏Rebalance。从这个⻆度来说,【Broker】在Rebalance过程中,是⼀个协调者的⻆⾊。

【Broker】通过实时的或者周期性的上报⾃⼰的【Topic】配置信息给【NameServer】,在【NameServer】组装成【Topic】的完整路由信息。消费者定时向【NameServer】定时拉取最新路由信息,以实现间接通知,当发现队列信息变化,触发Rebalance。

在【Broker】内部,通过元数据管理器维护了Rebalance元数据信息,如下图所示:
broker维护rebalance元数据

这些管理器,内部实现都是⼀个Map。其中:

  • 队列信息由TopicConfigManager维护
  • 消费者组信息由ConsumerManager、ConsumerOffsetManager、SubscriptionGroupManager三者共同维护
队列信息变化

通常情况下,⼀个【Topic】下的队列数量不会频繁的变化,但是如果遇到,【Topic】队列数量扩/缩容,【broker】⽇常运维时的停⽌/启动或者【broker】异常宕机,也有可能导致队列数量发⽣变化。

下图展示了⼀个RocketMQ集群双主部署模式下,某个broker宕机后,Topic路由信息的变化。
队列信息变化宕机前
此时broker-a发生宕机
队列信息变化宕机后
可以看到,在宕机前,主题TopicTest下队列分布在broker-abroker-b两个【broker】上,每个【broker】上各有16个队列。当broker-a宕机后,其路由信息会被移除,此时我们就只能看到TopicTest在【broker-b】上的路由信息。

因此,在RocketMQ中,【Topic】的路由信息实际上是动态变化的。不论是停⽌/启动/扩容导致的所有变化最终都会上报给【NameServer】。客户端向【NameServer】发送请求,获得某个【Topic】的完整路由信息。如果发现队列信息发⽣变化,则触发Reabalance。

消费者组信息变化

当消费者信息变化时,【Broker】是通知每个消费者各⾃Rebalance,即每个消费者⾃⼰给⾃⼰重新分配队列,⽽不是【Broker】将分配好的结果告知【Consumer】。

ConsumerManager

ConsumerManager是最重要的⼀个消费者组元数据管理器,其维护了某个消费者组的订阅信息,以及所有消费者实例的详细信息,并在发⽣变化时提供通知机制。

  • 数据添加:客户端通过发送HEART_BEAT请求给【Broker】,将⾃⼰添加到ConsumerManager中维护的某个消费者组中。需要注意的是,每个【Consumer】都会向所有的【Broker】进⾏⼼跳,因此每个【Broker】都维护了所有消费者的信息。
  • 数据删除:客户端正常停⽌时,发送UNREGISTER_CLIENT请求,将⾃⼰从ConsumerManager移除;此外在发⽣⽹络异常时,【Broker】也会主动将消费者从ConsumerManager中移除。
  • 数据查询:消费者可以向任意⼀个【Broker】发送GET_CONSUMER_LIST_BY_GROUP请求,来获得⼀个消费者组下的所有消费者实例信息。
ConsumerOffsetManager

事实上,通过ConsumerManager已经可以获得Rebalance时需要的消费者所有必要信息。但是还有⼀点,Rebalance时,如果某个队列重新分配给了某个消费者,那么必须接着从上⼀个消费者的位置继续开始消费,这就是ConsumerOffsetManager的作⽤。

消费者可以给【Broker】发送UPDATE_CONSUMER_OFFSET请求,来更新消费者组对于某个【Topic】的消费进度。发送QUERY_CONSUMER_OFFSET指令,来从ConsumerOffsetManager中查询消费进度。

SubscriptionGroupManager

订阅组配置管理器,内部针对每个消费者组维护⼀个SubscriptionGroupConfig。主要是为了针对消费者组进⾏⼀些运维操作。

正文到此结束