原创

RocketMQ源码解析

目录结构

  • acl: ACL是access control list的简称,俗称访问控制列表。访问控制,基本上会涉及到⽤户、资源、权限、⻆⾊等概念。
  • broker: broker模块(broker启动进程)
  • client: 包含 Producer 和 Consumer,负责消息的发和收,包含消息⽣产者、消息消费者相关类
  • common: ⼀些公共代码,供其他模块依赖
  • conf: 配置⽂件
  • dev: 开发者信息(⾮源代码)
  • distribution: ⼀些 sh 脚本和 配置,主要是在部署的时候⽤的(⾮源代码)
  • example: RocketMQ示例代码,使⽤样例,包括各种使⽤⽅法,Pull模式、Push模式、⼴播模式、有序消息、事务消息等等
  • filter: 消息过滤相关基础类
  • logappender: ⽇志相关
  • namesrv: 可以理解成注册中⼼,每个 broker 都会在这⾥注册,client 也会从这⾥获取 broker 的相关信息
  • openmessaging: 消息开放标准
  • remoting: 基于 Netty 实现的⽹络通信模块,包括 Server 和 Client, client、broker、namesrv 等模块对它都有依赖
  • srvutil: 服务⼯具类
  • store: 消息存储实现相关类,负责消息的存储和读取
  • style: 代码模板,为了统⼀代码⻛格
  • test: 测试⽤例
  • tools: ⼀些⼯具类,基于它们可以写⼀些 sh ⼯具来管理、查看MQ系统的⼀些信息
  • docs: ⽂档信息

NameServer

启动类

NamesrvStartup

public static void main(String[] args) {
    main0(args);
}

public static NamesrvController main0(String[] args) {

    try {
        //启动NameServer的第一步是构造一个NamesrvController实例,这个类是NameServer的核心类
        NamesrvController controller = createNamesrvController(args);
        // 第二步:初始化、启动 NamesrvController 类
        start(controller);
        String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
        log.info(tip);
        System.out.printf("%s%n", tip);
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }

    return null;
}

NamesrvStartup#createNamesrvController通过读取配置文件,生成对应的NamesrvConfigNettyServerConfig,并通过它们实例化NamesrvController对象,NamesrvController是【NameServer】的核心类。

初始化、启动NameSrvController

public static NamesrvController start(final NamesrvController controller) throws Exception {

    if (null == controller) {
        throw new IllegalArgumentException("NamesrvController is null");
    }
    //初始化
    boolean initResult = controller.initialize();
    if (!initResult) {
        controller.shutdown();
        System.exit(-3);
    }
    /**
     *  NameServer启动的最后一步,是注册了一个`JVM`的钩子函数,
     *  它会在`JVM`关闭之前执行。
     *  这个钩子函数的作用是释放资源,如关闭`Netty`服务器,关闭线程池等。
     */

    Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
        @Override
        public Void call() throws Exception {
            controller.shutdown();
            return null;
        }
    }));

    controller.start();

    return controller;
}

start方法中可以看到其执行流程:

  1. 初始化NamesrvController
  2. 添加代码的优雅关闭--Runtime.getRuntime().addShutdownHook
  3. 启动NamesrvController

优雅停机

注册了⼀个JVM的钩⼦函数,它会在JVM关闭之前执⾏。来看看执行了哪些操作:
NamesrvController#shutdown

public void shutdown() {
    this.remotingServer.shutdown();
    this.remotingExecutor.shutdown();
    this.scheduledExecutorService.shutdown();

    if (this.fileWatchService != null) {
        this.fileWatchService.shutdown();
    }
}

可以看到,在程序停止之前,先将Netty服务器和心跳线程池等关闭。

初始化controller

public boolean initialize() {

    // 加载`kvConfig.json`配置文件中的`KV`配置,然后将这些配置放到`KVConfigManager#configTable`属性中
    this.kvConfigManager.load();

    // 根据`NettyServerConfig`启动一个`Netty`服务器
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
    // 初始化负责处理`Netty`网络交互数据的线程池
    this.remotingExecutor =
        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

    this.registerProcessor();

    // 注册心跳机制线程池
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            // 在初始化NamesrvController过程中,会注册一个心跳机制的线程池,它会在启动后5秒开始每隔10秒扫描一次不活跃的broker。
            NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);
    /**
     * 除了心跳机制的线程池外,还会注册另外一个线程池,它会每隔10秒打印一次所有的`KV`配置信息。
     */
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            NamesrvController.this.kvConfigManager.printAllPeriodically();
        }
    }, 1, 10, TimeUnit.MINUTES);

    ...创建TLSContext

    return true;
}

该段代码主要完成了:

  1. 读取配置文件以及定时更新
  2. 创建Netty Server
  3. 注册定时监控【Broker】心跳状态的线程池
  4. 创建TLSContext

其中著重要的是定时监控【Broker】心跳状态:
RouteInfoManager#scanNotActiveBroker

/**
 * 在初始化`NamesrvController`过程中,会注册一个心跳机制的线程池,它会在启动后5秒开始每隔10秒扫描一次不活跃的`broker`。
 */
public void scanNotActiveBroker() {

    /**
     * nameServer会遍历`RouteInfoManager#brokerLiveTable`这个属性。
     * `RouteInfoManager#brokerLiveTable`属性存储的是集群中所有`broker`的活跃信息,
     * 主要是`BrokerLiveInfo#lastUpdateTimestamp`属性,它描述了`broker`上一次更新的活跃时间戳。
     * 若`lastUpdateTimestamp`属性超过120秒未更新,则该`broker`会被视为失效并从`brokerLiveTable`中移除。
     */
    Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, BrokerLiveInfo> next = it.next();
        // 拿到每个broker上报的时间
        long last = next.getValue().getLastUpdateTimestamp();
        /**
         * BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;即120秒
         * 如果上报的时间加上120s还小于当前的时间,就认为当前这个broker 已shutdown
         */

        if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
            //关闭长连接通道
            RemotingUtil.closeChannel(next.getValue().getChannel());
            // 移除当前broker信息, 将该 broker 从 brokerLiveTable 中移除
            it.remove();
            log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
            //维护其他的路由信息表
            this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
        }
    }
}

因为RouteInfoManager中除了brokerLiveTable外还维护了其他缓存信息:

// Topic消息队列路由信息,消息发送是根据路由表进行负载均衡
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
// Broker基础信息,包含brokerName、所属集群名称、主备Broker地址
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// Broker集群信息,存储集群中所有的broker名称
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// Broker状态信息,NameServer每次收到心跳包是会替换该信息
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// Broker上的FilterServer列表,用于类模式消息过滤
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

所以在将超时未更新心跳的【Broker】从获取列表中删除后还要同步更新其他缓存:
RouteInfoManager#onChannelDestroy

public void onChannelDestroy(String remoteAddr, Channel channel) {
    // ...获取brokerAddrFound,这里brokerAddrFound=remoteAddr

    if (brokerAddrFound != null && brokerAddrFound.length() > 0) {
        try {
            try {
                // 加写锁
                this.lock.writeLock().lockInterruptibly();
                // 根据broker从brokerLiveTable移除
                this.brokerLiveTable.remove(brokerAddrFound);
                // 根据broker从filterServerTable移除
                this.filterServerTable.remove(brokerAddrFound);
                String brokerNameFound = null;
                boolean removeBrokerName = false;
                // 维护brokerAddrTable
                Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
                    this.brokerAddrTable.entrySet().iterator();
                while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
                    BrokerData brokerData = itBrokerAddrTable.next().getValue();

                    Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
                    // ... 这里就是将超时的Broker从broker列表中删除

                    if (brokerData.getBrokerAddrs().isEmpty()) {
                        // 这里意思是当删除的broker是集群中最后一个命名为brokerName的broker,那么把这个broker从集群中删除
                        // 这里要清楚一个概念 一个master-slave的两个或者多个broker,他们的brokerName相同,broker-id不同
                        removeBrokerName = true;
                        itBrokerAddrTable.remove();
                        log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
                            brokerData.getBrokerName());
                    }
                }

                if (brokerNameFound != null && removeBrokerName) {
                    // ...根据brokerName从clusterAddrTable中移除broker
                }

                if (removeBrokerName) {
                    // ... 删除该topic下客户端到过期broker的路由,意思是通知客户端broker集群信息变化,不要再继续路由到该broker了
                    // ... 如果是consumer接收到该通知后会触发rebalance
                    // ... 如果topic下的路由信息为空,则把该topic也删除,因为没有broker可以处理topic
                }
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("onChannelDestroy Exception", e);
        }
    }
}

启动NameSrvController

就是启动Netty Server。

Producer

RocketMQ发送消息分为三种实现⽅式:同步发送、异步发送、单向发送。⽬前的MQ中间件从存储模型来看,分为需要持久化和不需要持久化两种。

先来看MQProducer接口实现了哪些功能。

public interface MQProducer extends MQAdmin {

    // 启动
    void start() throws MQClientException;

    // 关闭
    void shutdown();

    List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;

    // 同步发送消息
    SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,
        InterruptedException;

    SendResult send(final Message msg, final long timeout) throws MQClientException,
        RemotingException, MQBrokerException, InterruptedException;

    // 异步发送消息
    void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
        RemotingException, InterruptedException;

    void send(final Message msg, final SendCallback sendCallback, final long timeout)
        throws MQClientException, RemotingException, InterruptedException;

    // oneway形式发送消息,相较于异步发送,其实就是没有注册回调函数
    void sendOneway(final Message msg) throws MQClientException, RemotingException,
        InterruptedException;

    // ...各种同步、异步、单向发送消息的重载

    // 发送事务消息
    TransactionSendResult sendMessageInTransaction(final Message msg,
        final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;

    TransactionSendResult sendMessageInTransaction(final Message msg,
        final Object arg) throws MQClientException;

    // 批量消息
    SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,
        InterruptedException;

    SendResult send(final Collection<Message> msgs, final long timeout) throws MQClientException,
        RemotingException, MQBrokerException, InterruptedException;

    // 像rpc调用一样通过同步得到返回值或者异步接收通知,这里是需要得到consumer返回的结果
    Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException,
        RemotingException, MQBrokerException, InterruptedException;

    void request(final Message msg, final RequestCallback requestCallback, final long timeout)
        throws MQClientException, RemotingException, InterruptedException, MQBrokerException;

    // ...rpc各种重载
}

该接口有两个实现类:

  1. DefaultMQProducer:默认实现,没有实现发送事务消息的⽅法。
  2. TransactionMQProducer:继承⾃DefaultMQProducer,实现了发送事务消息的⽅法。

消息

Message

public class Message implements Serializable {
    // 基本属性:主题topic
    private String topic;
    // 基本属性:消息flag
    private int flag;
    // 扩展属性都存在Message的properties中
    private Map<String, String> properties;
    // 基本属性:消息体
    private byte[] body;
    // 事务ID 事务消息才有
    private String transactionId;

    // ...
}

消息还有一些隐藏属性是以固定的Key形式保存在properties字段中的,在平时的开发中需要规避使用这些Key:

  • TAGS:消息TAG,⽤于消息过滤
  • KEYS:消息索引键
  • WAIT:消息发送时是否等消息存储完成后再返回
  • DELAY:消息延迟级别,⽤于定时消息或消息重试

这些Key都记录在MessageConst常量类中。

对于⼀些关键属性,Message类提供了⼀组set接⼝来进⾏设置:

public void setTags(String tags) {...}
public void setKeys(Collection<String> keys) {...}
public void setDelayTimeLevel(int level) {...}
public void setWaitStoreMsgOK(boolean waitStoreMsgOK) {...}
public void setBuyerId(String buyerId) {...}
属性 接⼝ ⽤途
MessageConst.PROPERTY_TAGS setTags 在消费消息时可以通过tag进⾏消息过滤判定
MessageConst.PROPERTY_KEYS setKeys 可以设置业务相关标识,⽤于消费处理判定,或消息追踪查询
MessageConst.PROPERTY_DELAY_TIME_LEVEL setDelayTimeLevel 消息延迟处理级别,不同级别对应不同延迟时间
MessageConst.PROPERTY_WAIT_STORE_MSG_OK setWaitStoreMsgOK 在同步刷盘情况下是否需要等待数据落地才认为消息发送成功
MessageConst.PROPERTY_BUYER_ID setBuyerId 源码中没有使用,仅仅在测试代码中使用

这几个字段没有放到Message本身的原因大概率是因为在【Broker】端。

MessageExt

对于发送⽅来说,上述Message的定义以⾜够。但对于RocketMQ的整个处理流程来说,还需要更多的字段信息⽤以记录⼀些必要内容,⽐如消息的id、创建时间、存储时间等等

public class MessageExt extends Message {

    // 消息所在队列所在的broker
    private String brokerName;
    // 消息队列Id
    private int queueId;
    // 消息存储大小
    private int storeSize;
    // 消息在队列的偏移量
    private long queueOffset;
    private int sysFlag;
    private long bornTimestamp;
    private SocketAddress bornHost;

    private long storeTimestamp;
    private SocketAddress storeHost;
    // 消息ID
    private String msgId;
    // 消息真实保存在commitlog的偏移量
    private long commitLogOffset;
    private int bodyCRC;
    private int reconsumeTimes;

    private long preparedTransactionOffset;

    // ...
}
字段 ⽤途
queueId 记录MessageQueue编号,消息会被发送到Topic下的MessageQueue
storeSize 记录消息在Broker存盘⼤⼩
queueOffset 记录在ConsumeQueue中的偏移
sysFlag 记录⼀些系统标志的开关状态,MessageSysFlag中定义了系统标识
bornTimestamp 消息创建时间,在Producer发送消息时设置
storeHost 记录存储该消息的Broker地址
msgId 消息Id Broker中生成
commitLogOffset 记录在Broker中存储便宜
bodyCRC 消息内容CRC校验值
reconsumeTimes 消息重试消费次数
preparedTransactionOffset 事务详细相关字段

Message在properties属性中有一个Key为MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX的属性,在消息发送时由【Producer】⽣成创建。上⾯表格中的msgId则是消息在【Broker】端进⾏存储时通过MessageDecoder.createMessageId⽅法⽣成的,其构成为:
message id构成

public static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) {
    input.flip();
    int msgIDLength = addr.limit() == 8 ? 16 : 28;
    input.limit(msgIDLength);
    //broker地址
    input.put(addr);
    //CommitLog偏移量
    input.putLong(offset);

    return UtilAll.bytes2string(input.array());
}

生产者启动

在创建好【Producer】之后,使⽤它来发消息之前,需要先启动它,即调⽤它的start()⽅法,代码如下:

public void start() throws MQClientException {
    this.setProducerGroup(withNamespace(this.producerGroup));
    // DefaultMQProducerImpl的启动
    this.defaultMQProducerImpl.start();
    if (null != traceDispatcher) {
        try {
            traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
        } catch (MQClientException e) {
            log.warn("trace dispatcher start failed ", e);
        }
    }
}

可以看到主要就是启动DefaultMQProducerImpl

public void start(final boolean startFactory) throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;
            // 一些配置校验
            this.checkConfig();
            // 如果没有特别指定producerGroup,就会把instanceName设置为进程id
            if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                this.defaultMQProducer.changeInstanceNameToPID();
            }
            // 创建一个MQClientInstance实例
            this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
            // 将producer注册到MQClientInstance#producerTable
            boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                    null);
            }

            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

            if (startFactory) {
                // 启动MQClientInstance
                mQClientFactory.start();
            }

            log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                this.defaultMQProducer.isSendMessageWithVIPChannel());
            this.serviceState = ServiceState.RUNNING;
            break;
        // 其他状态就报错防止重复启动
        default:
            break;
    }
    // 发送心跳消息给broker
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    // 启动线程定时清理超时的请求
    this.timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            try {
                RequestFutureTable.scanExpiredRequest();
            } catch (Throwable e) {
                log.error("scan RequestFutureTable exception", e);
            }
        }
    }, 1000 * 3, 1000);
}

此处调用了MQClientInstance#start()方法:

public void start() throws MQClientException {
    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // Start request-response channel
                this.mQClientAPIImpl.start();
                // Start various schedule tasks
                this.startScheduledTask();
                // Start pull service
                // consumer中启动拉取消息
                this.pullMessageService.start();
                // Start rebalance service
                // 触发定期rebalance broker和consumer
                this.rebalanceService.start();
                // Start push service
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

其中对于【Producer】而言对其产生影响的主要是:

  1. 建立与【NameServer】的连接
  2. 启动定时器定时更新【NameServer】信息、【Topic】路由信息等
  3. 建立、维护与【Broker】的连接

其他的功能主要是维护【Broker】和【Consumer】之间的联系。后面在消费者中着重讲解。

消息发送

消息发送流程主要是:

  1. 验证消息,主要是进⾏消息的⻓度验证。
  2. 查找路由,⼀个【Topic】有多个队列,分散在不同的【Broker】。【Producer】在发送消息的时候,需要选择⼀个队列。
  3. 消息发送(包含异常处理机制)。

【Producer】发送消息全局时序图:
消息发送时序图

我们以DefaultMQProducer#send方法为例:

public SendResult send(
    Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // 消息验证
    Validators.checkMessage(msg, this);
    msg.setTopic(withNamespace(msg.getTopic()));
    // 查找路由、消息发送都在此完成
    return this.defaultMQProducerImpl.send(msg);
}

消息验证

先看看消息验证功能:

public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
    throws MQClientException {
    if (null == msg) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
    }
    // topic
    Validators.checkTopic(msg.getTopic());
    Validators.isNotAllowedSendTopic(msg.getTopic());

    // body
    if (null == msg.getBody()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
    }

    if (0 == msg.getBody().length) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
    }

    if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
            "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
    }
}

可以看到消息验证主要验证了【topic】信息和消息结构。

查找、选择路由

向下跟踪,DefaultMQProducerImpl#sendDefaultImpl

private SendResult sendDefaultImpl(
    Message msg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // 校验 Producer 处于运行状态
    this.makeSureStateOK();
    // 校验消息格式
    Validators.checkMessage(msg, this.defaultMQProducer);
    // 调用编号;用于下面打印日志,标记为同一次发送消息
    final long invokeID = random.nextLong();
    long beginTimestampFirst = System.currentTimeMillis();
    long beginTimestampPrev = beginTimestampFirst;
    long endTimestamp = beginTimestampFirst;
    // 查找路由
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        boolean callTimeout = false;
        MessageQueue mq = null;
        Exception exception = null;
        SendResult sendResult = null;
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
        int times = 0;
        String[] brokersSent = new String[timesTotal];
        for (; times < timesTotal; times++) {
            String lastBrokerName = null == mq ? null : mq.getBrokerName();
            // 路由选择 从队列列表中选择一个队列发送
            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
            if (mqSelected != null) {
                mq = mqSelected;
                brokersSent[times] = mq.getBrokerName();
                try {
                    beginTimestampPrev = System.currentTimeMillis();
                    if (times > 0) {
                        // Reset topic with namespace during resend.
                        msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                    }
                    long costTime = beginTimestampPrev - beginTimestampFirst;
                    if (timeout < costTime) {
                        callTimeout = true;
                        break;
                    }
                    /**
                     * 调用发送消息核心方法
                     */
                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                    endTimestamp = System.currentTimeMillis();
                    // 更新Broker可用性信息
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    switch (communicationMode) {
                        case ASYNC:
                            return null;
                        case ONEWAY:
                            return null;
                        case SYNC:
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                    continue;
                                }
                            }

                            return sendResult;
                        default:
                            break;
                    }
                } catch (RemotingException e) {
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    continue;
                } catch (MQClientException e) {
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    continue;
                } catch (MQBrokerException e) {
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    switch (e.getResponseCode()) {
                        // 如下异常continue,进行发送消息重试
                        case ResponseCode.TOPIC_NOT_EXIST:
                        case ResponseCode.SERVICE_NOT_AVAILABLE:
                        case ResponseCode.SYSTEM_ERROR:
                        case ResponseCode.NO_PERMISSION:
                        case ResponseCode.NO_BUYER_ID:
                        case ResponseCode.NOT_IN_CURRENT_UNIT:
                            continue;
                        // 如果有发送结果,进行返回,否则,抛出异常;
                        default:
                            if (sendResult != null) {
                                return sendResult;
                            }

                            throw e;
                    }
                } catch (InterruptedException e) {
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());

                    log.warn("sendKernelImpl exception", e);
                    log.warn(msg.toString());
                    throw e;
                }
            } else {
                break;
            }
        }
        // 返回发送结果
        if (sendResult != null) {
            return sendResult;
        }
        // 根据不同情况,抛出不同的异常
        String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
            times,
            System.currentTimeMillis() - beginTimestampFirst,
            msg.getTopic(),
            Arrays.toString(brokersSent));

        info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

        MQClientException mqClientException = new MQClientException(info, exception);
        if (callTimeout) {
            throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
        }

        if (exception instanceof MQBrokerException) {
            mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
        } else if (exception instanceof RemotingConnectException) {
            mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
        } else if (exception instanceof RemotingTimeoutException) {
            mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
        } else if (exception instanceof MQClientException) {
            mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
        }

        throw mqClientException;
    }
    // 校验namesrv是否异常,如果namesrv有异常就跑出namesrv异常,否则抛出未查找到路由异常
    validateNameServerSetting();
    throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
        null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}

这个方法主要完成了以下几件事:

  1. 校验,校验producer和消息,这里是为了确认确实没有问题,其实部分校验在之前已经做过了。
  2. 查找、选择路由,通过【Topic】获取相关的【Broker】和队列。
    DefaultMQProducerImpl#tryToFindTopicPublishInfo
    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
     // 尝试从本地缓存获取
     TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
     // 如果本地缓存没有或者有问题,则从namesrv获取
     if (null == topicPublishInfo || !topicPublishInfo.ok()) {
         this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
         this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
         topicPublishInfo = this.topicPublishInfoTable.get(topic);
     }
     // 获取到了,返回
     if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
         return topicPublishInfo;
     } else {
         // 如果是第一次,则会从namesrv获取topic元数据,获取后会缓存下来,以后从缓存中获取
         this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
         topicPublishInfo = this.topicPublishInfoTable.get(topic);
         return topicPublishInfo;
     }
    }
    
  3. 选择消息发送的队列
    MQFaultStrategy#selectOneMessageQueue

    /**
    * 选择一个消息队列
    * 1. 开启容错条件下
    *  1.1 如果队列可用并且是第一次发送或者前一次发送消息的队列在同一个broker,那么使用随机+round-robin方法获取队列
    *  1.2 否则,从故障列表中选择最可能可用的broker并使用随机+round-robin方法获取其中一个队列
    * 2. 关闭容错条件下
    *  2.1 如果是第一次发送就使用随机+round-robin方法获取队列
    *  2.2 否则在使用随机+round-robin方法的同时尽可能的排除前一次发送失败的broker上选择队列
    *
    * @param tpInfo         路由信息
    * @param lastBrokerName 第一次发送为空,重试时使用
    * @return 可用的消息队列
    */
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
     if (this.sendLatencyFaultEnable) { // 开启容错
         try {
             // 根据MessageQueue的数量,round-robin负载均衡
             int index = tpInfo.getSendWhichQueue().getAndIncrement();
             for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                 int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                 if (pos < 0)
                     pos = 0;
    
                 MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
    
                 // 判断当前选择的队列所在broker是否可用,如果可用就将队列返回;否则继续遍历
                 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                     // 如果队列可用并且是第一次发送或者上次发送的队列和本次发送的队列在同一个broker上
                     if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                         return mq;
                 }
             }
             /*
             经过上面遍历仍然没有找到发送队列
              */
             // 从故障列表里面选择一个最可能可用的broker
             final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
             // 获取broker的写队列数
             int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
             if (writeQueueNums > 0) { // 有可写队列
                 // 第一次是随机,重试时使用round-robin方法选择
                 final MessageQueue mq = tpInfo.selectOneMessageQueue();
                 if (notBestBroker != null) {
                     // 将取到的队列信息设置为取到的broker
                     mq.setBrokerName(notBestBroker);
                     // 选择一个写队列
                     mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                 }
                 return mq;
             } else {
                 // 移除Fault,意味着broker重新参与路由计算
                 latencyFaultTolerance.remove(notBestBroker);
             }
         } catch (Exception e) {
             log.error("Error occurred when selecting message queue", e);
         }
    
         return tpInfo.selectOneMessageQueue();
     }
    
     // 关闭容错
     return tpInfo.selectOneMessageQueue(lastBrokerName);
    }
    

    这里可用过注释了解选择过程。

这里我们也可以使用DefaultMQProducer#send的重载方法

public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
    throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    msg.setTopic(withNamespace(msg.getTopic()));
    return this.defaultMQProducerImpl.send(msg, selector, arg);
}

这里就不需要按照上面我们说的selectOneMessageQueue方法选择队列,而是按照传入的MessageQueueSelector的方式选择。

发送消息

DefaultMQProducerImpl#sendKernelImpl

private SendResult sendKernelImpl(final Message msg,
    final MessageQueue mq,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final TopicPublishInfo topicPublishInfo,
    final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    long beginStartTime = System.currentTimeMillis();
    // ...
            // 如果不是批量消息,则生成Unique Key
            if (!(msg instanceof MessageBatch)) {
                // 设置Unique Key
                MessageClientIDSetter.setUniqID(msg);
            }
            // ...消息压缩
            // ...事务消息的处理
            // ...发送消息前的校验
            // ...构建发送消息请求
            // 发送消息
            SendResult sendResult = null;
            switch (communicationMode) {
                case ASYNC:
                    // ...
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        mq.getBrokerName(),
                        tmpMessage,
                        requestHeader,
                        timeout - costTimeAsync,
                        communicationMode,
                        sendCallback,
                        topicPublishInfo,
                        this.mQClientFactory,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                        context,
                        this);
                    break;
                case ONEWAY:
                case SYNC:
                    // ...
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        mq.getBrokerName(),
                        msg,
                        requestHeader,
                        timeout - costTimeSync,
                        communicationMode,
                        context,
                        this);
                    break;
                default:
                    assert false;
                    break;
            }
            // 发送消息后逻辑
    // ...
}

MQClientAPIImpl#sendMessage的主要作用就是:

  1. 消息发送(同步、异步、单向选择不同的发送方式)
  2. 解析response

发送消息后会执行LatencyFaultToleranceImpl#updateFaultItem用来将增加【Broker】的延迟,用来在【Producer】本地维护每个【Broker】的可用性,作用于在之后发送消息时的队列选择。

Broker

消息中转⻆⾊,负责存储消息、转发消息。【Broker】在RocketMQ系统中负责接收从⽣产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

broker结构

启动类

public class BrokerStartup {
    ...
    public static void main(String[] args) {
        start(createBrokerController(args));
    }
    ...
}

createBrokerController

创建BrokerController的流程可看作以下几步:

  1. 设置NettyServerNettyClient缓冲区大小
  2. 命令⾏参数的处理,解析配置文件到BrokerConfig、NettyServerConfig、NettyClientConfig、MessageStoreConfig
  3. 解析【NameServer】地址
  4. 【Broker】⻆⾊的处理
  5. 日志处理
  6. 创建BrokerController实例
  7. 执行BrokerController初始化方法
  8. 注册优雅关闭shutdownHook

设置缓冲区大小

// 设置Netty的发送缓冲区的大小
if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
    NettySystemConfig.socketSndbufSize = 131072;
}

// 设置Netty接收缓冲区的大小。
if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
    NettySystemConfig.socketRcvbufSize = 131072;
}

读取配置并写入Config对象

两种方式:

  1. 通过-c命令读取配置文件,从配置文件获取配置属性
  2. 通过命令行将配置直接写到命令行中,从命令行读取配置属性
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
    new PosixParser());
if (null == commandLine) {
    System.exit(-1);
}
/**
 * 创建配置对象
 */
final BrokerConfig brokerConfig = new BrokerConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();

nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
    String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
// 设置nettyServerConfig监听端口号
nettyServerConfig.setListenPort(10911);
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();

if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
    int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
    messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
}
/**
 * 填充配置文件
 */
if (commandLine.hasOption('c')) {
    String file = commandLine.getOptionValue('c');
    if (file != null) {
        configFile = file;
        InputStream in = new BufferedInputStream(new FileInputStream(file));
        properties = new Properties();
        properties.load(in);

        properties2SystemEnv(properties);
        MixAll.properties2Object(properties, brokerConfig);
        MixAll.properties2Object(properties, nettyServerConfig);
        MixAll.properties2Object(properties, nettyClientConfig);
        MixAll.properties2Object(properties, messageStoreConfig);

        BrokerPathConfigHelper.setBrokerConfigPath(file);
        in.close();
    }
}

MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);

在配置期间有三个端口的监听配置:

  1. NettyServer的端口监听
    // 设置nettyServerConfig监听端口号
    nettyServerConfig.setListenPort(10911);
    
  2. 消息存储配置haService监听端口
    /**
    * haListenPort是haService中使用
    * 默认值为:listenPort + 1
    */
    messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
    
  3. BrokerController#initialize方法中,配置了主从同步的服务,监听端口
    /**
    * 主要用于slave同master同步。
    *
    * fastListenPort
    * 主要是fastRemotingServer服务使用
    * 默认为:listenPort - 2
    */
    fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
    

获取NameServer地址

// 获取nameserver地址
String namesrvAddr = brokerConfig.getNamesrvAddr();
if (null != namesrvAddr) {
    try {
        // nameserver地址如果为集群';'截取
        String[] addrArray = namesrvAddr.split(";");
        for (String addr : addrArray) {
            RemotingUtil.string2SocketAddress(addr);
        }
    } catch (Exception e) {
        System.out.printf(
            "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
            namesrvAddr);
        System.exit(-3);
    }
}

处理Broker的角色(Master or Slave)

/**
 * 主要是处理在集群条件下的Broker角色的,
 * 从这里看出来brokerId为0的为Mater节点,其他的为Slave节点。
 */
switch (messageStoreConfig.getBrokerRole()) {
    case ASYNC_MASTER:
    case SYNC_MASTER:
        //brokerId为0的为Mater节点
        brokerConfig.setBrokerId(MixAll.MASTER_ID);
        break;
    case SLAVE:
        //Slave节点大于0
        if (brokerConfig.getBrokerId() <= 0) {
            System.out.printf("Slave's brokerId must be > 0");
            System.exit(-3);
        }

        break;
    default:
        break;
}

日志配置

实例化BrokerController

// 创建BrokerController
final BrokerController controller = new BrokerController(
    brokerConfig,
    nettyServerConfig,
    nettyClientConfig,
    messageStoreConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);

BrokerController中维护了很多线程池和每个线程池相关的排队队列,还有一些事件监听器。

执行BrokerController的初始化方法

// 初始化
boolean initResult = controller.initialize();

初始化方法有很多流程,我们分布来解析。

1. 将rebalance元数据信息读取到缓存
// rebalance元数据,从文件中读取到每个Manager的缓存中
// 维护队列信息
boolean result = this.topicConfigManager.load();
// 下面三个共同维护消费者组信息
result = result && this.consumerOffsetManager.load();
result = result && this.subscriptionGroupManager.load();
result = result && this.consumerFilterManager.load();
2. 创建MessageStore并初始化
if (result) {
    try {
        this.messageStore =
            new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
                this.brokerConfig);
        if (messageStoreConfig.isEnableDLegerCommitLog()) {
            DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
            ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
        }
        this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
        // load plugin
        MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
        this.messageStore = MessageStoreFactory.build(context, this.messageStore);
        this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
    } catch (IOException e) {
        result = false;
        log.error("Failed to initialize", e);
    }
}

// 分配commitLog数据到consumeQueue在这里
result = result && this.messageStore.load();

MessageStore消息存储后面会详细讲,这里先做一个引入。

3. 创建NettyServer用来接收消息或请求

此处会创建两个NettyServer

  1. 用于收发消息和请求remotingServer
  2. 用于主从同步fastRemotingServer
4. 创建多个线程池

【Broker】是RocketMQ的核心,其有很多功能,【Broker】通过维护很多线程池来将每个功能很好的隔离开来,这些线程池就维护在BrokerController中。

this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
    this.brokerConfig.getSendMessageThreadPoolNums(),
    this.brokerConfig.getSendMessageThreadPoolNums(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.sendThreadPoolQueue,
    new ThreadFactoryImpl("SendMessageThread_"));

this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
    this.brokerConfig.getPullMessageThreadPoolNums(),
    this.brokerConfig.getPullMessageThreadPoolNums(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.pullThreadPoolQueue,
    new ThreadFactoryImpl("PullMessageThread_"));

this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
    this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
    this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.replyThreadPoolQueue,
    new ThreadFactoryImpl("ProcessReplyMessageThread_"));

this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
    this.brokerConfig.getQueryMessageThreadPoolNums(),
    this.brokerConfig.getQueryMessageThreadPoolNums(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.queryThreadPoolQueue,
    new ThreadFactoryImpl("QueryMessageThread_"));

this.adminBrokerExecutor =
    Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
        "AdminBrokerThread_"));

this.clientManageExecutor = new ThreadPoolExecutor(
    this.brokerConfig.getClientManageThreadPoolNums(),
    this.brokerConfig.getClientManageThreadPoolNums(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.clientManagerThreadPoolQueue,
    new ThreadFactoryImpl("ClientManageThread_"));

this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
    this.brokerConfig.getHeartbeatThreadPoolNums(),
    this.brokerConfig.getHeartbeatThreadPoolNums(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.heartbeatThreadPoolQueue,
    new ThreadFactoryImpl("HeartbeatThread_", true));

this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
    this.brokerConfig.getEndTransactionThreadPoolNums(),
    this.brokerConfig.getEndTransactionThreadPoolNums(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.endTransactionThreadPoolQueue,
    new ThreadFactoryImpl("EndTransactionThread_"));

this.consumerManageExecutor =
    Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
        "ConsumerManageThread_"));
5. 注册处理器

在第三步我们创建了两个NettyServer,这里注册处理器其实就是通过不同的RequestCode,告诉两个NettyServer接收到了不同的请求,比如:发送消息请求、拉取消息请求、心跳请求...,NettyServer根据不同的RequestCode分派给不同的Processor处理,第四步我们看到【Broker】通过不同的线程池来将所有动作进行隔离,所以这里也要传入线程池,将最终的数据维护在NettyRemotingAbstract#processorTable缓存中。

public void registerProcessor() {
    /**
     * SendMessageProcessor
     */
    SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
    sendProcessor.registerSendMessageHook(sendMessageHookList);
    sendProcessor.registerConsumeMessageHook(consumeMessageHookList);

    this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
    this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
    this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
    this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
    this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
    this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
    this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
    this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
    /**
     * PullMessageProcessor
     */
    this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
    this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);

    /**
     * ReplyMessageProcessor
     */
    ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);
    replyMessageProcessor.registerSendMessageHook(sendMessageHookList);

    this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
    this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
    this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
    this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);

    /**
     * QueryMessageProcessor
     */
    NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
    this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
    this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);

    this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
    this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);

    /**
     * ClientManageProcessor
     */
    ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
    this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
    this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
    this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);

    this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
    this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
    this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);

    /**
     * ConsumerManageProcessor
     */
    ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);
    this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
    this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
    this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);

    this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
    this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
    this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);

    /**
     * EndTransactionProcessor
     */
    this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
    this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);

    /**
     * Default
     */
    AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
    this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
    this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
}
6. 启动定时任务

这里定时任务有多个:

  1. 每天0点打印前一天消息队列数量
  2. 定时刷新rebalance元数据
  3. 维护【consumer】集群
  4. 更新【NameServer】集群
  5. 更新【Broker】集群(主从)
    ...
7. 其他

除上面几点之外,initialize方法还初始化了事务消息配置、权限管理、和一些自定义rpc动作。

添加钩子函数,JVM在退出的时候进行资源释放

Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
    private volatile boolean hasShutdown = false;
    private AtomicInteger shutdownTimes = new AtomicInteger(0);

    @Override
    public void run() {
        synchronized (this) {
            log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
            if (!this.hasShutdown) {
                this.hasShutdown = true;
                long beginTime = System.currentTimeMillis();
                controller.shutdown();
                long consumingTimeTotal = System.currentTimeMillis() - beginTime;
                log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
            }
        }
    }
}, "ShutdownHook"));

启动BrokerController

除了启动服务外,还会启动【Broker】向【NameServer】发送心跳的定时器。

/**
 * 默认每隔30s 向集群中所有的NameServer 发送心跳包,
 * NameServer 收到Broker 心跳包时会更新brokerLiveTable缓存中BrokerLiveInfo 的lastUpdateTimestamp,
 * 然后NameServer 每隔10s扫描brokerLiveTable,如果连续120s 没有收到心跳包,
 * NameServer 将移除该Broker 的路由信息同时关闭Socket 连接。
 */
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        try {
            BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
        } catch (Throwable e) {
            log.error("registerBrokerAll Exception", e);
        }
    }
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

Broker向NameServer发起注册请求

BrokerController#doRegisterBrokerAll

private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
    TopicConfigSerializeWrapper topicConfigWrapper) {

    List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
        this.brokerConfig.getBrokerClusterName(),
        this.getBrokerAddr(),
        this.brokerConfig.getBrokerName(),
        this.brokerConfig.getBrokerId(),
        this.getHAServerAddr(),
        topicConfigWrapper,
        this.filterServerManager.buildNewFilterServerList(),
        oneway,
        this.brokerConfig.getRegisterBrokerTimeoutMills(),
        this.brokerConfig.isCompressedRegister());

    if (registerBrokerResultList.size() > 0) {
        RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
        //在broker启动的时候,start()的时候如果broker为slave角色时会进行设置masterAddress
        if (registerBrokerResult != null) {
            if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
                this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
            }
            //这个是设置broker元数据同步时的master broker地址
            this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());

            if (checkOrderConfig) {
                this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
            }
        }
    }
}

BrokerOuterAPI

public List<RegisterBrokerResult> registerBrokerAll(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,
    final boolean oneway,
    final int timeoutMills,
    final boolean compressed) {

    final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
    //获取所有nameServer地址
    List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
    if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
        //创建请求头
        final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
        requestHeader.setBrokerAddr(brokerAddr);
        requestHeader.setBrokerId(brokerId);
        requestHeader.setBrokerName(brokerName);
        requestHeader.setClusterName(clusterName);
        requestHeader.setHaServerAddr(haServerAddr);
        requestHeader.setCompressed(compressed);
        //构建请求体
        RegisterBrokerBody requestBody = new RegisterBrokerBody();
        requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
        requestBody.setFilterServerList(filterServerList);
        final byte[] body = requestBody.encode(compressed);
        final int bodyCrc32 = UtilAll.crc32(body);
        requestHeader.setBodyCrc32(bodyCrc32);
        final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
        //遍历nameserver地址
        for (final String namesrvAddr : nameServerAddressList) {
            brokerOuterExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        //分别给每个nameserver注册broker信息
                        RegisterBrokerResult result = registerBroker(namesrvAddr,oneway,timeoutMills,requestHeader,body);
                        if (result != null) {
                            registerBrokerResultList.add(result);
                        }

                        log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                    } catch (Exception e) {
                        log.warn("registerBroker Exception, {}", namesrvAddr, e);
                    } finally {
                        countDownLatch.countDown();
                    }
                }
            });
        }

        try {
            countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
        }
    }

    return registerBrokerResultList;
}

private RegisterBrokerResult registerBroker(
    final String namesrvAddr,
    final boolean oneway,
    final int timeoutMills,
    final RegisterBrokerRequestHeader requestHeader,
    final byte[] body
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
    InterruptedException {
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
    request.setBody(body);
    //异步的方式,不接收返回值
    if (oneway) {
        try {
            //netty客户端发送心跳
            this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
        } catch (RemotingTooMuchRequestException e) {
            // Ignore
        }
        return null;
    }
    //同步方式,接收返回值
    RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
    assert response != null;
    //解析注册结果
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            RegisterBrokerResponseHeader responseHeader =
                (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
            RegisterBrokerResult result = new RegisterBrokerResult();
            result.setMasterAddr(responseHeader.getMasterAddr());
            result.setHaServerAddr(responseHeader.getHaServerAddr());
            if (response.getBody() != null) {
                result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
            }
            return result;
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark());
}

⽂件存储路径

跟存储路径如下:

public class MessageStoreConfig {
    @ImportantField
    private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
}

可在配置文件中配置:

# 存储路径
storePathRootDir=/Users/**/store

文件目录结构

Broker文件目录结构

下⾯对上⾯的⽂件进⾏逐⼀介绍:

  • commitlog: 消息存储⽬录,存储的消息内容
  • config: 运⾏期间⼀些配置信息,主要包括如下信息:
    • consumerFilter.json: 主题消息过滤信息
    • consumerOffset.json: 集群消费模式消息消费进度
    • delayOffset.json: 延时消息队列拉取进度
    • subscriptionGroup.json: 消息消费组配置信息
    • topic.json: 配置属性
  • consumequeue: 消息消费队列存储⽬录,存储的是消息在commitlog中的索引和大小
  • index: 消息索引⽂件存储⽬录,按key、tag、时间等存储,用于消息查询
  • abort: 如果存在abort⽂件说明【Broker】⾮正常关闭,该⽂件默认启动时候创建,正常退出之前删除
  • checkpoint: ⽂件检测点,存储commitlog⽂件最后⼀次刷盘时间戳、consumequeue最后⼀次刷盘时间、index索引⽂件最后⼀次刷盘时间戳

Broker接收Producer发送的消息

SendMessageProcessor#asyncProcessRequest

public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
                                                              RemotingCommand request) throws RemotingCommandException {
    // ...
            if (requestHeader.isBatch()) {
                // 处理批量消息
                return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
            } else {
                // 其他消息
                return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
            }
    }
}

SendMessageProcessor#asyncSendMessage

private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                            SendMessageContext mqtraceContext,
                                                            SendMessageRequestHeader requestHeader) {
    // ...预处理等

    // 消息拼装 start
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    msgInner.setTopic(requestHeader.getTopic());
    msgInner.setQueueId(queueIdInt);

    if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
        return CompletableFuture.completedFuture(response);
    }

    msgInner.setBody(body);
    msgInner.setFlag(requestHeader.getFlag());
    MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
    msgInner.setPropertiesString(requestHeader.getProperties());
    msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
    msgInner.setBornHost(ctx.channel().remoteAddress());
    msgInner.setStoreHost(this.getStoreHost());
    msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
    String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    // 消息拼装 end

    CompletableFuture<PutMessageResult> putMessageResult = null;

    // ...处理事务消息
        putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
    } else {
    // 处理普通消息
        putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
    }
    return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}

DefaultMessageStore#asyncPutMessage

public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
    // 检查当前服务状态,身份(主或从)等
    PutMessageStatus checkStoreStatus = this.checkStoreStatus();
    if (checkStoreStatus != PutMessageStatus.PUT_OK) {
        return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
    }
    // 检查消息topic和propertiesString长度
    PutMessageStatus msgCheckStatus = this.checkMessage(msg);
    if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
        return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
    }

    long beginTime = this.getSystemClock().now();
    CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);

    // ...
    return putResultFuture;
}

CommitLog#asyncPutMessage

public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    // ...

        // 延迟消息处理
        if (msg.getDelayTimeLevel() > 0) {
            // 如果设置的级别超过了最大级别,重置延迟级别
            if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
            }

            // 修改Topic的投递目标为内部主题SCHEDULE_TOPIC_XXXX
            topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
            // 根据delayLevel,确定将消息投递到SCHEDULE_TOPIC_XXXX内部的哪个队列中
            queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

            // 记录下原始topic和queueId
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
            // 重新计算propertiesString
            msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

            msg.setTopic(topic);
            msg.setQueueId(queueId);
        }
    }

    // 获得最新的MappedFile
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
    // 写消息加锁
    putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
    try {
        long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
        this.beginTimeInLock = beginLockTimestamp;

        // Here settings are stored timestamp, in order to ensure an orderly
        // global
        msg.setStoreTimestamp(beginLockTimestamp);
        // 当不存在映射文件时,进行创建
        if (null == mappedFile || mappedFile.isFull()) {
            mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
        }
        if (null == mappedFile) {
            log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
            beginTimeInLock = 0;
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
        }
        // 消息写入MappedFile
        result = mappedFile.appendMessage(msg, this.appendMessageCallback);
        switch (result.getStatus()) {
            case PUT_OK:
                break;
            case END_OF_FILE: // 当前MappedFile已经写满了,两种情况,要么就是刚好消息能把最后空间写满(当前消息+8(totalSize+CommitLog.BLANK_MAGIC_CODE)==所剩空间)
                              // 否则最后一定是一个totalSize(int) + CommitLog.BLANK_MAGIC_CODE(int) + 一串0结尾
                unlockMappedFile = mappedFile;
                // Create a new file, re-write the message
                mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                // ...
                result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                break;
            case MESSAGE_SIZE_EXCEEDED: // 消息内容超过4M
            case PROPERTIES_SIZE_EXCEEDED: // properties内容太多
                beginTimeInLock = 0;
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
            // ...
    } finally {
        // 消息写完,解锁
        putMessageLock.unlock();
    }
    // ...

    PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

    // Statistics
    storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
    storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());

    // 根据配置同步或者异步刷盘,asyncPutMessage和putMessage两个方法的区别就是此处是使用多线程
    CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, putMessageResult, msg);
    // 主从同步,asyncPutMessage和putMessage两个方法的区别就是此处是使用多线程
    CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, putMessageResult, msg);
    // 处理两个future(combine)
}

其中有3点比较重要:

  1. mappedFileQueue.getLastMappedFile创建MappedFile
  2. mappedFile.appendMessage(msg, this.appendMessageCallback)消息写入MappedFile
  3. 数据刷盘和主从同步

先来看看【commitlog】的结构

CommitLog

commitlog结构
⼀台【Broker】服务器只有⼀个CommitLog⽂件(组),RocketMQ会将所有主题的消息存储在同⼀个⽂件中,这个⽂件中就存储着⼀条条Message,每条Message都会按照顺序写⼊。

创建MappedFile

如果最新的MappedFile为空或者已经写满的情况下,会生成新的MappedFile

public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
    // 创建文件开始offset
    long createOffset = -1;
    MappedFile mappedFileLast = getLastMappedFile();
    // 如果不存在映射文件 - 这是第一条消息
    if (mappedFileLast == null) {
        createOffset = startOffset - (startOffset % this.mappedFileSize);
    }
    // 最后一个文件已满
    if (mappedFileLast != null && mappedFileLast.isFull()) {
        // 计算文件名称的逻辑  就是上一次的文件名+每个文件的大小,默认1G
        createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
    }
    // 创建文件
    if (createOffset != -1 && needCreate) {
        // ...
            try {
                mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
            } catch (IOException e) {
                log.error("create mappedFile exception", e);
            }
        }

        if (mappedFile != null) {
            if (this.mappedFiles.isEmpty()) {
                mappedFile.setFirstCreateInQueue(true);
            }
            this.mappedFiles.add(mappedFile);
        }

        return mappedFile;
    }

    return mappedFileLast;
}

消息写入MappedFile

MappedFile#appendMessagesInner

public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
    // ...
    // 获取当前写消息的位置
    int currentPos = this.wrotePosition.get();

    if (currentPos < this.fileSize) { // 判断当前可写
        ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
        //设置写入 position,执行写入,更新 wrotePosition(当前写入位置,下次开始写入开始位置)。
        byteBuffer.position(currentPos);
        AppendMessageResult result;
        if (messageExt instanceof MessageExtBrokerInner) {
            // 执行写入
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
        } else if (messageExt instanceof MessageExtBatch) {
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
        } else {
            return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
        }
        this.wrotePosition.addAndGet(result.getWroteBytes()); // 指针向后移动
        this.storeTimestamp = result.getStoreTimestamp();
        return result;
    }
    log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}

CommitLog.DefaultAppendMessageCallback#doAppend

public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
                                    final MessageExtBrokerInner msgInner) {
    // 1. 获得Commit Log Offset
    long wroteOffset = fileFromOffset + byteBuffer.position();

    // 判断ipv4或ipv6 分配storeHostLength的大小
    // 2. hostHolder用于维护broker地址信息
    this.resetByteBuffer(storeHostHolder, storeHostLength);
    String msgId;
    // 3 创建msgId
    if ((sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
        msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
    } else {
        msgId = MessageDecoder.createMessageId(this.msgIdV6Memory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
    }

    // ... 从CommitLog.this.topicQueueTable中获取要写入的队列的Offset

    // ...消息的properties、topic、body处理
    // 根据properties、topic、body的长度获得msgLen
    final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);

    // 消息超过4M会报错
    if (msgLen > this.maxMessageSize) {
        CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
                + ", maxMessageSize: " + this.maxMessageSize);
        return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
    }

    // 判断是否有足够的写入空间 就是当前消息 + 8字节(每条消息至少有一个totalSize(int) + MAGIC_CODE(int)) <= MappedFile所剩空间
    if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
        // ...这里不够就写入一个空消息(所剩空间大小(int) + CommitLog.BLANK_MAGIC_CODE(int) + 一串0)
        final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
        byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
        return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
                queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
    }

    // 重置
    this.resetByteBuffer(msgStoreItemMemory, msgLen);
    // 1 TOTALSIZE
    this.msgStoreItemMemory.putInt(msgLen);
    // 2 MAGICCODE
    this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
    // 3 BODYCRC
    this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
    // 4 QUEUEID
    this.msgStoreItemMemory.putInt(msgInner.getQueueId());
    // 5 FLAG
    this.msgStoreItemMemory.putInt(msgInner.getFlag());
    // 6 QUEUEOFFSET
    this.msgStoreItemMemory.putLong(queueOffset);
    // 7 PHYSICALOFFSET
    this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
    // 8 SYSFLAG
    this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
    // 9 BORNTIMESTAMP
    this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
    // 10 BORNHOST
    this.resetByteBuffer(bornHostHolder, bornHostLength);
    this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
    // 11 STORETIMESTAMP
    this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
    // 12 STOREHOSTADDRESS
    this.resetByteBuffer(storeHostHolder, storeHostLength);
    this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
    // 13 RECONSUMETIMES
    this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
    // 14 Prepared Transaction Offset
    this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
    // 15 BODY
    this.msgStoreItemMemory.putInt(bodyLength);
    if (bodyLength > 0)
        this.msgStoreItemMemory.put(msgInner.getBody());
    // 16 TOPIC
    this.msgStoreItemMemory.put((byte) topicLength);
    this.msgStoreItemMemory.put(topicData);
    // 17 PROPERTIES
    this.msgStoreItemMemory.putShort((short) propertiesLength);
    if (propertiesLength > 0)
        this.msgStoreItemMemory.put(propertiesData);

    final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
    // Write messages to the queue buffer
    byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);

    AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
            msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);

       // ...
    return result;
}

刷盘和主从同步

消息输盘分3种情况:

  1. 同步刷盘
  2. 异步刷盘&&关闭内存字节缓冲区
  3. 异步输盘&&开启内存字节缓冲区

分别对应3个实现类
commitLog刷盘实现类图

实现类 场景 插⼊消息性能
CommitRealTimeService 异步刷盘&&开启内存字节缓冲区 只是提交,由checkpoint确定落盘时机,使用了页缓存
FlushRealTimeService 异步刷盘&&关闭内存字节缓冲区 开启线程延迟落盘,这里只是比较并开启延迟线程
GroupCommitService 同步刷盘 同步执行落盘

在【Broker】的BrokerController启动执行initialize方法时,会执行DefaultMessageStore#load

public boolean load() {
    boolean result = true;

    try {
        boolean lastExitOK = !this.isTempFileExist();
        log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");

        if (null != scheduleMessageService) {
            // 读取配置文件并将延迟级别加载到缓存
            result = result && this.scheduleMessageService.load();
        }

        // load Commit Log
        result = result && this.commitLog.load();

        // load Consume Queue
        result = result && this.loadConsumeQueue();

        if (result) {
            // 触发落盘
            this.storeCheckpoint =
                new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
            // 索引文件
            this.indexService.load(lastExitOK);

            this.recover(lastExitOK);

            log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
        }
    } catch (Exception e) {
        log.error("load exception", e);
        result = false;
    }

    if (!result) {
        this.allocateMappedFileService.shutdown();
    }

    return result;
}

StoreCheckpoint中是这样实现的

this.randomAccessFile = new RandomAccessFile(file, "rw");
this.fileChannel = this.randomAccessFile.getChannel();
// 每次消息内容达到操作系统每页存储时触发落盘 使用了页缓存
this.mappedByteBuffer = fileChannel.map(MapMode.READ_WRITE, 0, MappedFile.OS_PAGE_SIZE);

可见,RocketMQ在使用CommitRealTimeService刷盘时使用了也缓存提高效率。

Broker接收消息消费请求

ConsumeQueue

为了加速【ConsumeQueue】消息检索速度和节省磁盘空间,每⼀个【ConsumeQueue】存储单元不会存储消息的全量信息,其格式设计如下图所示:
consumeQueue存储单元

【ConsumeQueue】同样也是在【Broker】启动调用DefaultMessageStore#load过程中生成的,当然如果在消息生产时会创建新的【ConsumeQueue】。

【ConsumeQueue】主要从两个方向来看:

  1. 每个存储单元怎样存入【ConsumeQueue】
  2. 【Consumer】是怎样消费消息的

ConsumeQueue写入消息

ConsumeQueue#putMessagePositionInfoWrapper

boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
                request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());

ConsumeQueue#putMessagePositionInfo

private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
    final long cqOffset) {

    if (offset + size <= this.maxPhysicOffset) {
        log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
        return true;
    }
    // 数据写入,20字节,CommitLog(8) + MessageSize(4) + TagsCode(8)
    this.byteBufferIndex.flip();
    this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
    this.byteBufferIndex.putLong(offset);
    this.byteBufferIndex.putInt(size);
    this.byteBufferIndex.putLong(tagsCode);

    final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;

    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
    if (mappedFile != null) {
        // ...
        this.maxPhysicOffset = offset + size;
        return mappedFile.appendMessage(this.byteBufferIndex.array());
    }
    return false;
}

ConsumeQueue拉取消息

对于【Broker】来说,接收到【Consumer】发送过来的请求,处理请求在PullMessageProcessor#processRequest方法中。

private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
    throws RemotingCommandException {
    // ...根据客户端发送的拉取消息头,构建拉取结果响应体

    // ...前戏各种校验

    // 获取消息
    final GetMessageResult getMessageResult =
        this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
            requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
    if (getMessageResult != null) {
        // 返回consumer下次消费的位置
        response.setRemark(getMessageResult.getStatus().name());
        responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
        responseHeader.setMinOffset(getMessageResult.getMinOffset());
        responseHeader.setMaxOffset(getMessageResult.getMaxOffset());

        // ...如果判断消费延迟太大,就告诉consumer下次从slave中拉取

        // ...根据getMessageResult的status属性,处理response code等

        // ...消息消费钩子,主要是针对上面得到的response code,处理上下文

        switch (response.getCode()) {
            case ResponseCode.SUCCESS:
                // 消费成功处理
                this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                    getMessageResult.getMessageCount());

                this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                    getMessageResult.getBufferTotalSize());

                this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
                if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
                    final long beginTimeMills = this.brokerController.getMessageStore().now();
                    final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
                    this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
                        requestHeader.getTopic(), requestHeader.getQueueId(),
                        (int) (this.brokerController.getMessageStore().now() - beginTimeMills));
                    response.setBody(r);
                }
                // ...
                break;
            case ResponseCode.PULL_NOT_FOUND:
                // 判断broker是否允许被挂起
                if (brokerAllowSuspend && hasSuspendFlag) {
                    // 获取长轮询超时时长
                    long pollingTimeMills = suspendTimeoutMillisLong;
                    // 如果长轮询支持未开启,则pollingTimeMills为短轮询时间,ShortPollingTimeMills默认为1秒
                    if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                        pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
                    }

                    String topic = requestHeader.getTopic();
                    long offset = requestHeader.getQueueOffset();
                    int queueId = requestHeader.getQueueId();
                    // 根据入参request,Nio的channel,轮询时间,当前消息存储时间戳,消息拉取offset,订阅信息,消息过滤表达式等信息构建长轮询拉取请求
                    PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
                        this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
                    this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
                    // 设置拉取返回为null,不对客户端进行返回
                    response = null;
                    break;
                }
            // ...其他状态码处理
        }
    } else {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("store getMessage return null");
    }

    boolean storeOffsetEnable = brokerAllowSuspend;
    storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
    storeOffsetEnable = storeOffsetEnable
        && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
    if (storeOffsetEnable) {
        // broker维护consumeOffset
        this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
            requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
    }
    return response;
}

此处有两个重要步骤:

  1. 获取消息DefaultMessageStore#getMessage
  2. 在【Broker】未拉取到消息时,RocketMQ会调⽤PullRequestHoldService将请求hold住,不会返回客户端响应,这⾥就是⻓轮询的核⼼逻辑。
获取消息
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
    final int maxMsgNums, final MessageFilter messageFilter) {
    // ...broker状态的校验

    // ...属性初始化,拉取消息的状态等

    final long maxOffsetPy = this.commitLog.getMaxOffset();
    // 1. 获取ConsumeQueue
    ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
    if (consumeQueue != null) {
        minOffset = consumeQueue.getMinOffsetInQueue();
        maxOffset = consumeQueue.getMaxOffsetInQueue();

        // ...对于offset的判断,异常情况
        } else {
            // 2. 从ConsumeQueue中获取消息的映射,通过它可以定位到CommitLog中的消息
            SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
            if (bufferConsumeQueue != null) {
                try {
                    status = GetMessageStatus.NO_MATCHED_MESSAGE;

                    long nextPhyFileStartOffset = Long.MIN_VALUE;
                    long maxPhyOffsetPulling = 0;

                    int i = 0;
                    final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
                    final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
                    ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                    for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                        // 解析从ConsumeQueue中获得的存储单元
                        long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
                        int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
                        long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();

                        // ...

                        // 消息过滤
                        if (messageFilter != null
                            && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
                            if (getResult.getBufferTotalSize() == 0) {
                                status = GetMessageStatus.NO_MATCHED_MESSAGE;
                            }

                            continue;
                        }
                        // 从CommitLog中获取消息
                        SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
                        if (null == selectResult) {
                            if (getResult.getBufferTotalSize() == 0) {
                                status = GetMessageStatus.MESSAGE_WAS_REMOVING;
                            }

                            nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
                            continue;
                        }

                        // ...
                    }

                    // 针对消息堆积量过大会切换到Slave进行查询。
                    // maxOffsetPy 为当前最大物理偏移量,maxPhyOffsetPulling 为本次消息拉取最大物理偏移量,他们的差即可表示消息堆积量。
                    // TOTAL_PHYSICAL_MEMORY_SIZE 表示当前系统物理内存,accessMessageInMemoryMaxRatio 的默认值为 40,
                    // 以上逻辑即可算出当前消息堆积量是否大于物理内存的40%,如果大于则将 suggestPullingFromSlave 设置为 true。
                    long diff = maxOffsetPy - maxPhyOffsetPulling;
                    long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
                        * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
                    getResult.setSuggestPullingFromSlave(diff > memory);
                } finally {
                    bufferConsumeQueue.release();
                }
            // ...
}

获取消息的过程主要有三步:

  1. 根据请求获取【ConsumeQueue】
  2. 从【ConsumeQueue】中得到存储单元
  3. 根据存储单元从【CommitLog】中得到消息内容
获取ConsumeQueue

根据【Topic】和queueId定位【ConsumeQueue】。

ConsumeQueue中得到存储单元

消息消费时将需要消费的offset传入,首先找到【ConsumeQueue】中对应的存储单元,然后通过该单元定位到【CommitLog】中的消息。
ConsumeQueue#getIndexBuffer

public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
    // ConsumeQueue文件大小
    int mappedFileSize = this.mappedFileSize;
    // 根据消费进度,找到在consumeQueue文件里的偏移量
    long offset = startIndex * CQ_STORE_UNIT_SIZE;
    if (offset >= this.getMinLogicOffset()) {
        // ConsumerQueue映射文件
        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
        if (mappedFile != null) {
            // 返回文件里的某一块内容
            SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
            return result;
        }
    }
    return null;
}
从CommitLog中得到消息内容

通过得到的CommitLogOffsetMessageSize直接定位到一条消息。
CommitLog#getMessage

public SelectMappedBufferResult getMessage(final long offset, final int size) {
    //第一步: 获取文件大小
    int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
    //第二步: 根据offset找到对应的内存映射文件,定位到具体的commitlog文件
    MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
    //第三步:获取消息
    if (mappedFile != null) {
        //根据消息偏移量和长度,获取消息内容
        int pos = (int) (offset % mappedFileSize);

        return mappedFile.selectMappedBuffer(pos, size);
    }
    return null;
}
Broker减压

上端代码最后的注释内容是:
当前Broker如果消息堆积(【CommitLog】最大偏移量-当前消费到的偏移量)大于物理内存的40%的情况下会通知【Consumer】下次拉取数据从slave拉取。

长轮训服务端实现

上面在调用PullRequestHoldService#suspendPullRequest方法将PullRequest放入到PullRequestHoldService.pullRequestTable中,⻓轮询真正的执⾏者为PullRequestHoldService``PullRequestHoldService继承了ServiceThread,重点关注其run⽅法:

public void run() {
    log.info("{} service started", this.getServiceName());
    while (!this.isStopped()) {
        try {
            // 如果支持长轮询,则等待5秒
            if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                this.waitForRunning(5 * 1000);
            } else {
                // 短轮询则默认等待1s
                this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
            }

            long beginLockTimestamp = this.systemClock.now();
            // 检测hold请求
            this.checkHoldRequest();
            // 如果检测花费时间超过5s打印日志
            long costTime = this.systemClock.now() - beginLockTimestamp;
            if (costTime > 5 * 1000) {
                log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
            }
        } catch (Throwable e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    log.info("{} service end", this.getServiceName());
}

具体的检测逻辑通过⽅法checkHoldRequest()实现。

private void checkHoldRequest() {
    // 迭代PullRequest Map,key=topic@queueId
    for (String key : this.pullRequestTable.keySet()) {
        // 解析出topic  queueId
        String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
        if (2 == kArray.length) {
            String topic = kArray[0];
            int queueId = Integer.parseInt(kArray[1]);
            // 获取当前获取的数据的最大offset
            final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
            try {
                // 通知消息到达
                this.notifyMessageArriving(topic, queueId, offset);
            } catch (Throwable e) {
                log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
            }
        }
    }
}

checkHoldRequest()⽅法解析pullRequestTable的keySet,对key进⾏解析,取出topic及queueId,获取topic+queueId对应的当前【MessageQueue】的最⼤offset,并与当前的offset对⽐从⽽确定是否有新消息到达,具体逻辑在notifyMessageArriving(topic, queueId, offset)⽅法中实现。

这⾥的检测逻辑整体是异步的,后台检测线程PullRequestHoldService⼀直在运⾏;在PullMessageProcessor中提交待检测的PullRequest到PullRequestHoldService,将其放⼊pullRequestTable,等待被PullRequestHoldService进⾏处理。

public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
    long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
    String key = this.buildKey(topic, queueId);
    ManyPullRequest mpr = this.pullRequestTable.get(key);
    if (mpr != null) {
        // 根据key=topic@queueId从pullRequestTable获取ManyPullRequest
        // 如果ManyPullRequest不为空,拷贝ManyPullRequest中的List<PullRequest>
        List<PullRequest> requestList = mpr.cloneListAndClear();
        if (requestList != null) {
            // 构造响应list

            List<PullRequest> replayList = new ArrayList<PullRequest>();

            for (PullRequest request : requestList) {
                long newestOffset = maxOffset;
                // 如果当前最新的offset小于等于请求的offset
                if (newestOffset <= request.getPullFromThisOffset()) {
                    // 当前最新的offset就是队列的最大offset
                    newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                }
                // 如果当前最新offset大于请求offset,也就是有新消息到来
                if (newestOffset > request.getPullFromThisOffset()) {
                    // 判断消息是否满足过滤表达式
                    boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
                        new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
                    // match by bit map, need eval again when properties is not null.
                    if (match && properties != null) {
                        match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
                    }

                    if (match) {
                        try {
                            // 消息匹配,则将消息返回客户端
                            this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                request.getRequestCommand());
                        } catch (Throwable e) {
                            log.error("execute request when wakeup failed.", e);
                        }
                        continue;
                    }
                }
                // 判断是否超时
                if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
                    try {
                        // 如果当前时间 >= 请求超时时间+hold时间,则返回客户端消息未找到
                        this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                            request.getRequestCommand());
                    } catch (Throwable e) {
                        log.error("execute request when wakeup failed.", e);
                    }
                    continue;
                }

                replayList.add(request);
            }

            if (!replayList.isEmpty()) {
                mpr.addPullRequest(replayList);
            }
        }
    }
}

notifyMessageArriving主要作⽤为判断消息是否到来,并根据判断结果对客户端进⾏相应。

  • ⽐较maxOffset与当前的offset,如果当前最新offset⼤于请求offset,也就是有新消息到来,则将新消息返回给客户端
  • 校验是否超时,如果当前时间 >= 请求超时时间+hold阻塞时间,则返回客户端消息未找到

该⽅法会在PullRequestHoldService中循环调⽤进⾏检查,也会在DefaultMessageStore中消息被存储的时候调⽤。这⾥体现了主动检查与被动通知共同作⽤的思路。

当服务端处理完成之后,响应客户端,客户端会在消息处理完成之后再次将拉取请求pullRequest放到PullMessageService中,等待下次轮询。这样就能够⼀直进⾏消息拉取操作。

index索引⽂件

RocketMQ引⼊Hash索引机制,为消息建⽴索引,它的键就是Message KeyUnique Key
HashMap的设计包括两个基本点:

  • Hash槽
  • Hash冲突的链表结构

那么,我们先看看index索引⽂件的结构:
索引文件结构

从上⾯的那个图可以看出,IndexFile 总共包含IndexHeader、Hash槽、Hash条⽬(数据)

  1. IndexHeader
    头部,包含40字节,记录该索引文件的统计信息,其结构如下:
    • beginTimestamp: 该索引⽂件中包含消息的最⼩存储时间
    • endTimestamp: 该索引⽂件中包含消息的的最⼤存储时间
    • beginPhyoffset: 该索引⽂件中包含消息的最⼩物理偏移量(commitlog⽂件偏移量)
    • endPhyoffset: 该索引⽂件中包含消息的最⼤物理偏移量(commiglog⽂件偏移量)
    • hashslotCount: hashslot个数,并不是hash槽使⽤的个数,在这⾥意义不⼤
    • indexCount: Index条⽬列表当前已使⽤的个数,Index条⽬在Index条⽬列表中按照顺序存储
  2. Hash槽
    ⼀个IndexFile默认包含500w个Hash槽。每个Hash槽存储的是落在该Hash槽的hashcode最新的Index索引。
  3. Index条⽬列表
    默认⼀个索引⽂件包含2000w个条⽬,每⼀个Index条⽬结构如下:
    • hashcode: key的hashcode
    • phyoffset: 消息对应的物理偏移量
    • timedif: 该消息存储时间和第⼀条消息的时间戳的差值,⼩于0该消息⽆效
    • preIndexNo: 该条⽬的前⼀条记录的Index索引,当出现hash冲突的时候,构建链表结构

构建索引

我们发送的消息体中,包含Message KeyUnique Key,那么就会给它们每⼀个都构建索引。

这⾥重点有两个:

  • 根据消息Key计算Hash槽的位置;
  • 根据Hash槽的数量和Index索引来计算Index条⽬的起始位置。

  • 首先通过hash算法得到key的hash值,通过取余的方式计算当前key所在hash槽,每个hash槽是int类型4字节,计算可得hash槽的位置

    absSlotPos=40(header) + mod(余数) * 4
    
  • 其次通过当前索引写入的index数量得到index所在的位置,已知每个index长度20字节(hash值(int-4) + commitLogOffset(long-8) + timeDiff(int-4) + hash槽的值(int-4))
    absIndexPos=40(header) + hashSlotNum(hash槽数量) * 4 + indexCount(已写入的index数量) * 20(每个单元20字节)
    
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
    if (this.indexHeader.getIndexCount() < this.indexNum) {
        // 计算key的hash
        int keyHash = indexKeyHashMethod(key);
        // 计算hash槽的坐标
        int slotPos = keyHash % this.hashSlotNum;
        // 40字节的header + 物理偏移量(偏移量*每个元素的大小,这里是int 4字节)
        int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

        FileLock fileLock = null;

        try {

            int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
            if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                slotValue = invalidIndex;
            }
            // 计算时间差值
            long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();

            timeDiff = timeDiff / 1000;

            if (this.indexHeader.getBeginTimestamp() <= 0) {
                timeDiff = 0;
            } else if (timeDiff > Integer.MAX_VALUE) {
                timeDiff = Integer.MAX_VALUE;
            } else if (timeDiff < 0) {
                timeDiff = 0;
            }
            // 计算INDEX条目的起始偏移量
            int absIndexPos =
                IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                    + this.indexHeader.getIndexCount() * indexSize;
            // 依次写入hashcode、消息偏移量、时间戳、hash槽的值
            this.mappedByteBuffer.putInt(absIndexPos, keyHash);
            this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
            // 将当前INDEX中包含的条目数量写入HASH槽
            this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());

            if (this.indexHeader.getIndexCount() <= 1) {
                this.indexHeader.setBeginPhyOffset(phyOffset);
                this.indexHeader.setBeginTimestamp(storeTimestamp);
            }

            if (invalidIndex == slotValue) {
                this.indexHeader.incHashSlotCount();
            }
            this.indexHeader.incIndexCount();
            this.indexHeader.setEndPhyOffset(phyOffset);
            this.indexHeader.setEndTimestamp(storeTimestamp);

            return true;
    // ...

    return false;
}

这样构建完Index索引之后,根据Message KeyUnique Key查询消息就简单了。

⽐如通过RocketMQ客户端⼯具,根据Unique Key来查询消息。在【Broker】端,通过Unique Key来计算Hash槽的位置,从⽽找到Index索引数据。从Index索引中拿到消息的物理偏移量,最后根据消息物理偏移量,直接到【CommitLog】⽂件中去找就可以了。

Checkpoint

【checkpoint】的作⽤是记录【Commitlog】,【ConsumeQueue】,【Index】⽂件的刷盘时间点,⽂件固定⻓度为4K,其中只⽤该⽂件的前⾯24个字节,其存储格式如下图所示:
checkpoint结构

下⾯对其组成部分简单介绍⼀下:

  1. physicMsgTimestamp: commitLog⽂件刷盘时间点
  2. logicsMsgTimestamp: 消息消费队列⽂件刷盘时间点
  3. indexMsgTimestamp: 索引⽂件刷盘时间点

文件清理机制

RocketMQ顺序写【CommitLog】、【ConsumeQueue】⽂件,所有写操作全部落在最后⼀个【CommitLog】或【ConsumeQueue】⽂件上,之前的⽂件在下⼀个⽂件创建后,将不会再被更新。

RocketMQ清除过期⽂件的⽅法是:
如果⾮当前写⽂件在⼀定时间间隔内没有再次被更新,则认为是过期⽂件,可以被删除,RocketMQ不会管这个这个⽂件上的消息是否被全部消费。默认每个⽂件的过期时间为72⼩时。通过在【Broker】配置⽂件中设置fileReservedTime来改变过期时间,单位为⼩时。

在程序启动执行BrokerController#start方法时,会启动一些定时任务,其中有一个就是用于清理过期文件的定时任务:
DefaultMessageStore#addScheduleTask

private void addScheduleTask() {
    // 每隔10s调度⼀次cleanFilesPeriodically,已检测是否需要清除过期⽂件。执⾏频率可以通过设置cleanResourceInterval,默认为10s
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            DefaultMessageStore.this.cleanFilesPeriodically();
        }
    }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);

    // ...
}

private void cleanFilesPeriodically() {
    this.cleanCommitLogService.run();
    this.cleanConsumeQueueService.run();
}

CommitLog文件过期清理

public void run() {
    try {
        // 第一个步骤:尝试删除过期文件;
        this.deleteExpiredFiles();
        // 第二个步骤:重试删除被hanged(由于被其他线程引用在第一阶段未删除的文件),在这里再重试一次。
        this.redeleteHangedFile();
    } catch (Throwable e) {
        DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
    }
}

private void deleteExpiredFiles() {
    int deleteCount = 0;
    // fileReservedTime:文件保留时间,也就是从最后一次更新时间到现在,如果超过了该时间,则认为是过期文件,可以被删除。
    long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
    // deletePhysicFilesInterval:删除物理文件的间隔,因为在一次清除过程中,可能需要删除的文件不止一个,该值指定两次删除文件的间隔时间。
    int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
    /**
     * 在清除过期文件时,如果该文件被其他线程所占用(引用次数大于0,比如读取消息),
     * 此时会阻止此次删除任务,同时在第一次试图删除该文件时记录当前时间戳,
     * destroyMapedFileIntervalForcibly表示第一次拒绝删除之后能保留的最大时间,
     * 在此时间内,同样可以被拒绝删除,同时会将引用减少1000个,超过该时间间隔后,文件将被强制删除。
     */
    int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();

    // RocketMQ通过deleteWhen设置⼀天的固定时间执⾏⼀次删除过期⽂件操作,默认为凌晨4点。
    boolean timeup = this.isTimeToDelete();
    // 重点分析一下磁盘不足的判断依据。
    boolean spacefull = this.isSpaceToDelete();
    boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;

    if (timeup || spacefull || manualDelete) {
        // 通过调⽤executeDeleteFilesManual⽅法⼿⼯触发过期⽂件删除
        // 并且最多只能调用20次
        if (manualDelete)
            this.manualDeleteFileSeveralTimes--;
        // 这里在判断磁盘空间过程中,如果判断磁盘空间不足,cleanImmediately为true,一定会执行清理
        boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;

        log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",
            fileReservedTime,
            timeup,
            spacefull,
            manualDeleteFileSeveralTimes,
            cleanAtOnce);

        fileReservedTime *= 60 * 60 * 1000;

        deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
            destroyMapedFileIntervalForcibly, cleanAtOnce);
        if (deleteCount > 0) {
        } else if (spacefull) {
            log.warn("disk space will be full soon, but delete file failed.");
        }
    }
}

private boolean isSpaceToDelete() {
    // 获取maxUsedSpaceRatio,表示commitlog、consumequeue文件所在磁盘分区的最大使用量,
    // 如果超过该值,则需要立即清除过期文件。
    double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;

    cleanImmediately = false;

    {
        String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
        // 通过File#getTotalSpace()获取commitlog所在磁盘分区总的存储容量,
        // 通过File#getFreeSpace()获取commitlog目录所在磁盘文件剩余容量并得出当前该分区的物理磁盘使用率physicRatio 。
        double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
        /**
         * RocketMQ另外提供了两个与磁盘空间使用率相关的系统级参数:
         *
         * -Drocketmq.broker.diskSpaceWarningLevelRatio=0.90:如果磁盘分区使用率超过该阔值,将设置磁盘不可写,此时会拒绝新消息的写入。
         * -Drocketmq.broker.diskSpaceCleanForciblyRatio=0.85:如果磁盘分区使用超过该阔值,建议立即执行过期文件清除,但不会拒绝新消息的写入。
         * 判断磁盘是否可用,用当前已使用物理磁盘率maxUsedSpaceRatio、diskSpaceWarningLevelRatio、diskSpaceCleanForciblyRatio,
         * 如果当前磁盘使用率达到上述阔值,将返回true表示磁盘已满,需要进行过期文件删除操作。
         */
        if (physicRatio > diskSpaceWarningLevelRatio) {
            boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
            if (diskok) {
                DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
            }

            cleanImmediately = true;
        } else if (physicRatio > diskSpaceCleanForciblyRatio) {
            cleanImmediately = true;
        } else {
            boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
            if (!diskok) {
                DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
            }
        }

        if (physicRatio < 0 || physicRatio > ratio) {
            DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);
            return true;
        }
    }

    // 和上面一样,此处是使用store根目录,这样可以确保除了commitlog外如果其他的数据超过限制了也会执行限制写入和文件清理
    // 因为commitlog文件地址配置可以和rootPath不同

    return false;
}

ConsumeQueue文件过期清理

private void deleteExpiredFiles() {
    // 清理文件时间间隔,当清理一次文件成功之后,下一次清理至少要经过deleteLogicsFilesInterval配置时间
    int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();

    long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();
    if (minOffset > this.lastPhysicalMinOffset) {
        this.lastPhysicalMinOffset = minOffset;

        ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
        // 从consumeQueueTable中遍历每个topic的ConsumeQueue
        for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
            for (ConsumeQueue logic : maps.values()) {
                // 从minOffset所在的ConsumeQueue文件开始遍历,如果文件未被使用过超过3天后会被清理
                int deleteCount = logic.deleteExpiredFile(minOffset);

                if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {
                    try {
                        Thread.sleep(deleteLogicsFilesInterval);
                    } catch (InterruptedException ignored) {
                    }
                }
            }
        }
        // 清理索引文件
        DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
    }
}

路由注册、发现、剔除

路由注册

【Broker】在启动时向【NameServer】发起注册请求,【NameServer】处理流程如下:

DefaultRequestProcessor#processRequest

public RemotingCommand processRequest(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {

    if (ctx != null) {
        log.debug("receive request, {} {} {}",
            request.getCode(),
            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
            request);
    }


    /**
     * 处理不同种类的心跳,broker、producer、consumer 都会发送心跳
     */
    switch (request.getCode()) {
        ...
        case RequestCode.REGISTER_BROKER: // 注册Broker
            Version brokerVersion = MQVersion.value2Version(request.getVersion());
            //如果RocketMQ的版本大于3.0.11
            if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                return this.registerBrokerWithFilterServer(ctx, request);
            } else {
                return this.registerBroker(ctx, request);
            }
        case RequestCode.UNREGISTER_BROKER: // Broker下线
            return this.unregisterBroker(ctx, request);
        ...
}

DefaultRequestProcessor#registerBrokerWithFilterServer

public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)
    throws RemotingCommandException {
    ...

    // 注册Broker流程
    RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
        requestHeader.getClusterName(),
        requestHeader.getBrokerAddr(),
        requestHeader.getBrokerName(),
        requestHeader.getBrokerId(),
        requestHeader.getHaServerAddr(),
        registerBrokerBody.getTopicConfigSerializeWrapper(),
        registerBrokerBody.getFilterServerList(),
        ctx.channel());

    // 构造返回结果
    responseHeader.setHaServerAddr(result.getHaServerAddr());
    responseHeader.setMasterAddr(result.getMasterAddr());

    byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
    response.setBody(jsonValue);

    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}

RouteInfoManager#registerBroker

public RegisterBrokerResult registerBroker(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,
    final Channel channel) {
    RegisterBrokerResult result = new RegisterBrokerResult();
    try {
        try {
            //打开写锁,说明路由注册是同步串行操作过程。
            this.lock.writeLock().lockInterruptibly();
            //对cluster路由表进行维护
            Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
            if (null == brokerNames) {
                brokerNames = new HashSet<String>();
                this.clusterAddrTable.put(clusterName, brokerNames);
            }
            brokerNames.add(brokerName);

            boolean registerFirst = false;
            //对broker路由表进行维护
            BrokerData brokerData = this.brokerAddrTable.get(brokerName);
            if (null == brokerData) {
                registerFirst = true;
                brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                this.brokerAddrTable.put(brokerName, brokerData);
            }
            Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
            //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
            //The same IP:PORT must only have one record in brokerAddrTable
            Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
            while (it.hasNext()) {
                Entry<Long, String> item = it.next();
                if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
                    it.remove();
                }
            }

            String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
            registerFirst = registerFirst || (null == oldAddr);

            if (null != topicConfigWrapper
                && MixAll.MASTER_ID == brokerId) {
                if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                    || registerFirst) {
                    ConcurrentMap<String, TopicConfig> tcTable =
                        topicConfigWrapper.getTopicConfigTable();
                    if (tcTable != null) {
                        for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                            this.createAndUpdateQueueData(brokerName, entry.getValue());
                        }
                    }
                }
            }
            //对brokerLiveTable进行维护
            BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                new BrokerLiveInfo(
                    System.currentTimeMillis(),
                    topicConfigWrapper.getDataVersion(),
                    channel,
                    haServerAddr));
            if (null == prevBrokerLiveInfo) {
                log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
            }

            if (filterServerList != null) {
                if (filterServerList.isEmpty()) {
                    this.filterServerTable.remove(brokerAddr);
                } else {
                    this.filterServerTable.put(brokerAddr, filterServerList);
                }
            }
            // 如果是从Broker的话,会在返回结果上加上masterAddr和haServerAddr
            if (MixAll.MASTER_ID != brokerId) {
                String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                if (masterAddr != null) {
                    BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                    if (brokerLiveInfo != null) {
                        result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                        result.setMasterAddr(masterAddr);
                    }
                }
            }
        } finally {
            this.lock.writeLock().unlock();
        }
    } catch (Exception e) {
        log.error("registerBroker Exception", e);
    }

    return result;
}

路由发现

路由发现是⾮实时的,当【topic】路由出现变化(新增或减少【Broker】),【NameServer】不主动推送给客户端,⽽是由客户端定时拉去主题最新的路由。

public RemotingCommand processRequest(ChannelHandlerContext ctx,
    ...
    // 根据主题获取路由信息,NameServer不主动推,保证NameServer简单⾼效
    case RequestCode.GET_ROUTEINFO_BY_TOPIC:
        return this.getRouteInfoByTopic(ctx, request);
    ...
}

DefaultRequestProcessor#getRouteInfoByTopic

public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final GetRouteInfoRequestHeader requestHeader =
        (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
    // 根据nameserverController拿到RouteInfoManager,根据路由获取相关的路由信息来填充TopicRouteData对象
    TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());

    if (topicRouteData != null) {
        // 查找到topic的路由信息,拼装返回结果
        if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
            String orderTopicConf =
                this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
                    requestHeader.getTopic());
            topicRouteData.setOrderTopicConf(orderTopicConf);
        }

        byte[] content = topicRouteData.encode();
        response.setBody(content);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }
    // 没有查找到topic的路由信息
    response.setCode(ResponseCode.TOPIC_NOT_EXIST);
    response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
        + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
    return response;
}

Broker剔除

RocktMQ有两个触发点来触发路由删除:

  1. 上面【NameServer】启动时会开启一个定时器每10秒一次轮训活跃【Broker】列表,如果某一个【Broker】有120秒没有过上报数据了,就将该【Broker】剔除。
    NameServer主动剔除Broker
  2. 【Broker】在正常被关闭的情况下,会向【NameServer】发起unrgisterBroker请求。
    BrokerController#shutdown

    public void shutdown() {
     ...
    
     this.unregisterBrokerAll();
    
     ...
    }
    

NameServer处理剔除Broker

DefaultRequestProcessor#unregisterBroker

public RemotingCommand unregisterBroker(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    ...

    this.namesrvController.getRouteInfoManager().unregisterBroker(
        requestHeader.getClusterName(),
        requestHeader.getBrokerAddr(),
        requestHeader.getBrokerName(),
        requestHeader.getBrokerId());

    ...
}

RouteInfoManager#unregisterBroker

public void unregisterBroker(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId) {
    try {
        try {
            this.lock.writeLock().lockInterruptibly();
            BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr);
            log.info("unregisterBroker, remove from brokerLiveTable {}, {}",
                brokerLiveInfo != null ? "OK" : "Failed",
                brokerAddr
            );

            this.filterServerTable.remove(brokerAddr);

            boolean removeBrokerName = false;
            BrokerData brokerData = this.brokerAddrTable.get(brokerName);
            if (null != brokerData) {
                String addr = brokerData.getBrokerAddrs().remove(brokerId);
                log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}",
                    addr != null ? "OK" : "Failed",
                    brokerAddr
                );

                if (brokerData.getBrokerAddrs().isEmpty()) {
                    this.brokerAddrTable.remove(brokerName);
                    log.info("unregisterBroker, remove name from brokerAddrTable OK, {}",
                        brokerName
                    );

                    removeBrokerName = true;
                }
            }

            if (removeBrokerName) {
                Set<String> nameSet = this.clusterAddrTable.get(clusterName);
                if (nameSet != null) {
                    boolean removed = nameSet.remove(brokerName);
                    log.info("unregisterBroker, remove name from clusterAddrTable {}, {}",
                        removed ? "OK" : "Failed",
                        brokerName);

                    if (nameSet.isEmpty()) {
                        this.clusterAddrTable.remove(clusterName);
                        log.info("unregisterBroker, remove cluster from clusterAddrTable {}",
                            clusterName
                        );
                    }
                }
                this.removeTopicByBrokerName(brokerName);
            }
        } finally {
            this.lock.writeLock().unlock();
        }
    } catch (Exception e) {
        log.error("unregisterBroker Exception", e);
    }
}

路由注册、剔除、发现全流程

路由注册、剔除、发现全流程

Consumer

消费者类图

启动

这里我们拿操作最为复杂的DefaultMQPushConsumer为例进行剖析:
DefaultMQPushConsumer#start

public void start() throws MQClientException {
    // 消费组
    setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
    // 启动DefaultMQPushConsumerImpl
    this.defaultMQPushConsumerImpl.start();
    if (null != traceDispatcher) {
        try {
            traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
        } catch (MQClientException e) {
            log.warn("trace dispatcher start failed ", e);
        }
    }
}

DefaultMQPushConsumerImpl启动有16个步骤。

public synchronized void start() throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
            this.serviceState = ServiceState.START_FAILED;
            /**
             * 1、基本的参数检查,
             * group name不能是DEFAULT_CONSUMER;
             * 检查配置信息:
             * 主要检查消费者组(consumeGroup)、消息消费方式(messageModel)、
             * 消息消费开始偏移量(consumeFromWhere)、消息队列分配算法(AllocateMessageQueueStrategy)、
             * 订阅消息主题(Map<topic,sub expression)、消息回调监听器(MessageListener)、
             * 顺序消息模式时是否只有一个消息队列等等。
             */
            this.checkConfig();
            /**
             * 2、将DefaultMQPushConsumer的订阅信息组装成Subscription放到RebalanceImpl中
             * 创建重试topic并订阅
             */
            this.copySubscription();
            /**
             * 3、修改InstanceName参数值为PID
             */
            if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                this.defaultMQPushConsumer.changeInstanceNameToPID();
            }
            /**
             * 4、新建一个MQClientInstance
             * MQClientManager中维护了一个factoryTable,类型为ConcurrentMap,保存了clientId和MQClientInstance
             * 这样做保证了一个进程只有一个实例,这个实例在一个JVM中消费者和生产者共用
             * MQClientInstance中维护了
             *   对Broker和NameServer的请求管理MQClientAPIImpl和MQAdminImpl;
             *   拉取消息服务PullService;
             *   负载均衡服务RebalanceService;
             *   消费状态管理器ConsumerStatsManager。
             */
            this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
            /**
             * 5、消费者组信息和消费模式设置(从DefaultMQPushConsumer中复制到RebalanceImpl)
             */
            this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
            this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
            /**
             * 6、队列分配算法,从这里可以知道设置分配算法需要在consumer.start方法执行之前,否则不会同步给RebalanceImpl
             */
            this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());

            this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
            // pullAPIWrapper拉取消息的API包装类,主要有消息的拉取方法和接收拉取到的消息
            this.pullAPIWrapper = new PullAPIWrapper(
                mQClientFactory,
                this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
            /**
             * 7、消息被客户端过滤时会回调hook
             */
            this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

            /**
             * 8、消费进度存储,如果是集群模式,使用远程存储RemoteBrokerOffsetStore,
             * 如果是广播模式,则使用本地存储LocalFileOffsetStore,
             */
            if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
            } else {
                switch (this.defaultMQPushConsumer.getMessageModel()) {
                    case BROADCASTING:
                        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    case CLUSTERING:
                        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    default:
                        break;
                }
                this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
            }
            /**
             * 9、加载消息进度
             * 消费模式是BROADCASTING模式时会从本地文件中读取到内存中并维护;
             * CLUSTERING模式下,offset的维护是在broker的,所以这里RemoteBrokerOffsetStore#load方法什么都没做。
             *
             * 当Consumer运行起来之后,offset也是需要RemoteBrokerOffsetStore联合Broker共同维护的,但在Consumer端不会将offset落地到磁盘。
             */
            this.offsetStore.load();
            /**
             * 10、判断是顺序消费还是并发消费,分别创建ConsumeMessageOrderlyService或者ConsumeMessageConcurrentlyService
             */
            if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                this.consumeOrderly = true;
                // ConsumeMessageOrderlyService内部消费也是多线程的,而在消费拉取到的线程时会将MessageQueue进行加锁
                // 保证对于每个消息队列的消息消费时有序的,所以RocketMQ的顺序消息有序性是针对队列的,不同的队列消费不确保有序性
                this.consumeMessageService =
                    new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
            } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                this.consumeOrderly = false;
                // 这里会多创建一个清理过期
                this.consumeMessageService =
                    new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
            }
            /**
             * 11、启动消息消费服务
             */
            this.consumeMessageService.start();
            /**
             * 12、注册消费者
             */
            boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
                throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                    null);
            }
            /**
             * 13、启动MQClientInstance,会启动PullMessageService和RebalanceService
             */
            mQClientFactory.start();
            log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
            this.serviceState = ServiceState.RUNNING;
            break;
        // ...
    }
    /**
     * 14、从NameServer更新topic路由和订阅信息
     */
    this.updateTopicSubscribeInfoWhenSubscriptionChanged();
    /**
     * 检测broker状态
     */
    this.mQClientFactory.checkClientInBroker();
    /**
     * 15、发送心跳,同步consumer配置到broker,同步FilterClass到FilterServer(PushConsumer)
     */
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    /**
     * 16、做一次re-balance
     */
    this.mQClientFactory.rebalanceImmediately();
}

从注解上我们能比较详细的明确每一步的功能。

在上面的代码中第13步启动MQClientInstance#start方法中比较复杂:

public void start() throws MQClientException {

    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // ...更新nameserver的地址
                // 创建consumer请求nameserver和broker的连接
                this.mQClientAPIImpl.start();
                /*
                * 开启定时器
                * 1. 更新NameServer集群信息
                * 2. 更新路由信息
                * 3. 清理已经不使用的broker,向broker发送心跳
                * 4. 维护consumerOffset,consumer定期同步本地的offset到broker
                * 5. 自适应MQPushConsumer的消费线程池
                */
                this.startScheduledTask();
                // consumer中启动拉取消息
                this.pullMessageService.start();
                // 触发定期rebalance broker和consumer
                this.rebalanceService.start();
                // Start push service
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

主要就是几个步骤:

  1. 创建【Consumer】与【Broker】的连接
  2. 开启多个定时器
  3. 启动拉取消息服务
  4. 定期rebalance

消息拉取

上面我们说了在【Consumer】启动过程中会调用PullMessageService#start方法,而PullMessageService是实现的Runnable接口,那么看看PullMessageService#run方法。

public void run() {
    log.info(this.getServiceName() + " service started");
    while (!this.isStopped()) {
        try {
            // 从LinkedBlockingQueue中拉取pullRequest
            PullRequest pullRequest = this.pullRequestQueue.take();
            this.pullMessage(pullRequest);
        // ...
    }
    log.info(this.getServiceName() + " service end");
}

核心就是this.pullRequestQueue.take,这里的pullRequestQueue是一个阻塞队列,那么消息是什么时候放入阻塞队列的呢?这里有两个地方:

  1. 立即放入PullMessageService#executePullRequestImmediately
    public void executePullRequestImmediately(final PullRequest pullRequest) {
     try {
         this.pullRequestQueue.put(pullRequest);
     } catch (InterruptedException e) {
         log.error("executePullRequestImmediately pullRequestQueue.put", e);
     }
    }
    
  2. 稍后放入PullMessageService#executePullRequestLater
    public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
     if (!isStopped()) {
         this.scheduledExecutorService.schedule(new Runnable() {
             @Override
             public void run() {
                 PullMessageService.this.executePullRequestImmediately(pullRequest);
             }
         }, timeDelay, TimeUnit.MILLISECONDS);
     } else {
         log.warn("PullMessageServiceScheduledThread has shutdown");
     }
    }
    

这里先不做具体解析,后面流程里面会具体用到他们。我们先来看看PullRequest有什么:

// 消费组
private String consumerGroup;
// 待拉取的消息队列
private MessageQueue messageQueue;
// 消息处理队列,消息从broker中拉取以后会先存到该ProcessQueue中,然后再提交给消费者线程池进行消费
private ProcessQueue processQueue;
// 待拉取消息的偏移量
private long nextOffset;
// 是否锁定
private boolean lockedFirst = false;

从这里我们可以看到两个重要的属性:

  • MessageQueue
  • ProcessQueue

MessageQueue我们都知道指的是消息队列,那么ProcessQueue是什么,有什么作用呢?那么我们现在来看看拉取消息的流程。

PullMessageService#pullMessage

private void pullMessage(final PullRequest pullRequest) {
    final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
    if (consumer != null) {
        DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
        impl.pullMessage(pullRequest);
    } else {
        log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
    }
}

从这里我们可以看到实际拉取消息是由DefaultMQPushConsumerImpl完成的:

这里DefaultMQPushConsumerImpl#pullMessage流程较长,我们分段解析:

// 1.获取处理队列ProcessQueue
final ProcessQueue processQueue = pullRequest.getProcessQueue();
// 2.如果dropped=true,那么return
if (processQueue.isDropped()) {
    log.info("the pull request[{}] is dropped.", pullRequest.toString());
    return;
}
// 3.然后更新该消息队列最后一次拉取的时间
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());

try {
    // 4.如果消费者服务状态不为ServiceState.RUNNING,默认延迟3秒再执行
    this.makeSureStateOK();
} catch (MQClientException e) {
    log.warn("pullMessage exception, consumer state not ok", e);
    // 延迟执行放入pullRequest操作
    this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
    return;
}
// 5.是否暂停,如果有那么延迟3s执行,目前我没有发现哪里有调用暂停,可能是为以后预留
if (this.isPause()) {
    log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
    return;
}

这部分属于拉取数据的一些前置操作,状态验证和属性设置等。

long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);

// 6.消息的拉取会有流量控制,当processQueue没有消费的消息的数量达到(默认1000个)会触发流量控制
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
    // PullRequest延迟50ms后,放入LinkedBlockQueue中
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
    // ...每触发1000次告警一次
    return;
}
// 7.当processQueue中没有消费的消息体总大小大于(默认100m)时,触发流控
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
    // ...每触发1000次告警一次
    return;
}
// 8.并发消息处理,判断processQueue中消息的最大间距,就是消息的最大位置和最小位置的差值如果大于默认值2000,那么触发流控
if (!this.consumeOrderly) {
    if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
        // ...每触发1000次告警一次
    return;
    }
} else {
    // 8.顺序消息处理
    if (processQueue.isLocked()) {
        if (!pullRequest.isLockedFirst()) {
            final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
            boolean brokerBusy = offset < pullRequest.getNextOffset();
            // ...
            pullRequest.setLockedFirst(true);
            pullRequest.setNextOffset(offset);
        }
    } else {
        this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
        return;
    }
}

此段代码主要是为了流量控制,当发现【Consumer】拉取到未消费的数据量达到阈值时,调用上面我们所说的PullMessageService#executePullRequestLater实现延迟拉取。

// 9.获取主题订阅信息
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
    this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
    log.warn("find the consumer's subscription failed, {}", pullRequest);
    return;
}

这里主要是校验订阅信息。

// 10.创建了拉取消息回调
PullCallback pullCallback = new PullCallback() {
    @Override
    public void onSuccess(PullResult pullResult) {
        if (pullResult != null) {
            // 处理拉取到的消息,消息过滤和消息的一些属性设置
            pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                subscriptionData);

            switch (pullResult.getPullStatus()) {
                case FOUND:
                    long prevRequestOffset = pullRequest.getNextOffset();
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                    long pullRT = System.currentTimeMillis() - beginTimestamp;
                    DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                        pullRequest.getMessageQueue().getTopic(), pullRT);

                    long firstMsgOffset = Long.MAX_VALUE;
                    if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                        // 立刻再去拉数据
                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                    } else {
                        // 拉到的消息顺序保存在msgFoundList
                        firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

                        DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                            pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
                        // 将消息放入ProcessQueue中
                        boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                        // 提交消费请求
                        DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                            pullResult.getMsgFoundList(),
                            processQueue,
                            pullRequest.getMessageQueue(),
                            dispatchToConsume);
                        // 启动下次拉取消息
                        if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                            DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                        } else {
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                        }
                    }

                    if (pullResult.getNextBeginOffset() < prevRequestOffset
                        || firstMsgOffset < prevRequestOffset) {
                        log.warn(
                            "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
                            pullResult.getNextBeginOffset(),
                            firstMsgOffset,
                            prevRequestOffset);
                    }

                    break;
                // ...其他状态的处理
            }
        }
    }

    @Override
    public void onException(Throwable e) {
        // ...拉取消息异常告警

        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
    }
};

此处创建了一个拉取消息的回调,这个回调会在后面执行拉取消息时传入,会在拉取到消息或者拉取失败时调用。主要有以下几个功能:

  1. 消息解析
  2. 消息过滤
  3. 修改下次拉取消息的【Broker】是master还是slave
  4. 消费者消费状态维护
  5. 拉取到的消息保存到【ProcessQueue】
  6. 提交消费消息的请求,由具体的消费服务ConsumeMessageService消费。
  7. 启动下次拉取消息
// 11.如果是集群消费模式,从内存中获取MessageQueue的commitlog偏移量
// ...

// 12.这里又去获取了一遍订阅信息,从中获得订阅的subExpression和classFilter,这里主要是为了消息过滤
// ...

// 13.构建拉取消息系统Flag: 是否支持commitOffset,suspend,subExpression,classFilter
int sysFlag = PullSysFlag.buildSysFlag(
    commitOffsetEnable, // commitOffset
    true, // suspend
    subExpression != null, // subscription
    classFilter // class filter
);
// 14.调用pullAPI方法来拉取消息
this.pullAPIWrapper.pullKernelImpl(
    pullRequest.getMessageQueue(), // 消息消费队列
    subExpression, // 订阅了哪些tags
    subscriptionData.getExpressionType(),
    subscriptionData.getSubVersion(), // 版本
    pullRequest.getNextOffset(), // 拉取位置
    this.defaultMQPushConsumer.getPullBatchSize(), // 从broker端拉取多少消息
    sysFlag, // 系统标记,FLAG_COMMIT_OFFSET FLAG_SUSPEND FLAG_SUBSCRIPTION FLAG_CLASS_FILTER
    commitOffsetValue, // 当前消息队列commitlog日志中当前的最新偏移量(内存中)
    BROKER_SUSPEND_MAX_TIME_MILLIS, // 允许的broker暂停的时间,毫秒为单位,默认为15s
    CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 超时时间,默认为30s
    CommunicationMode.ASYNC, // 超时时间,默认为30s
    pullCallback // pull回调函数
);

此处就是执行拉取消息,来看PullAPIWrapper#pullKernelImpl方法:

public PullResult pullKernelImpl(
    final MessageQueue mq,
    final String subExpression,
    final String expressionType,
    final long subVersion,
    final long offset,
    final int maxNums,
    final int sysFlag,
    final long commitOffset,
    final long brokerSuspendMaxTimeMillis,
    final long timeoutMillis,
    final CommunicationMode communicationMode,
    final PullCallback pullCallback
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // 这里如果broker返回建议从slave节点拉取消息的话,这里会修改成从slave拉取消息
    FindBrokerResult findBrokerResult =
        this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
            this.recalculatePullFromWhichNode(mq), false);
    if (null == findBrokerResult) {
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
        findBrokerResult =
            this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                this.recalculatePullFromWhichNode(mq), false);
    }

    if (findBrokerResult != null) {
        // ...检查版本
        int sysFlagInner = sysFlag;

        if (findBrokerResult.isSlave()) {
            sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
        }
        // 组装消息拉取请求头
        PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
        requestHeader.setConsumerGroup(this.consumerGroup);
        requestHeader.setTopic(mq.getTopic());
        requestHeader.setQueueId(mq.getQueueId());
        requestHeader.setQueueOffset(offset);
        requestHeader.setMaxMsgNums(maxNums);
        requestHeader.setSysFlag(sysFlagInner);
        requestHeader.setCommitOffset(commitOffset);
        // 设置broker最大阻塞时间,默认为15秒,BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15;
        requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
        requestHeader.setSubscription(subExpression);
        requestHeader.setSubVersion(subVersion);
        requestHeader.setExpressionType(expressionType);
        // 获取拉取broker地址
        String brokerAddr = findBrokerResult.getBrokerAddr();
        if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
            brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
        }
        // 执行消息拉取
        PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
            brokerAddr,
            requestHeader,
            timeoutMillis,
            communicationMode,
            pullCallback);

        return pullResult;
    }

    throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

长轮训客户端

⻓轮询机制,顾名思义,它不同于常规轮询⽅式。常规的轮询⽅式为客户端发起请求,服务端接收后该请求后⽴即进⾏相应的⽅式。

⻓轮询本质上仍旧是轮询,它与轮询不同之处在于,当服务端接收到客户端的请求后,服务端不会⽴即将数据返回给客户端,⽽是会先将这个请求hold住,判断服务器端数据是否有更新。如果有更新,则对客户端进⾏响应,如果⼀直没有数据,则它会在⻓轮询超时时间之前⼀直hold住请求并检测是否有数据更新,直到有数据或者超时后才返回。

在上面的代码中,brokerSuspendMaxTimeMillis(默认值为15s)代表进⾏消息拉取时,【broker】的最⻓阻塞时间。当进⾏消息拉取时,如果【broker】端没有消息,则进⾏阻塞,否则会对消息体进⾏打包并直接返回。

RocketMQ的⻓轮询是在【broker】上实现的,具体的代码实现在PullMessageProcessor中。在上面【Broker】章节中的【ConsumeQueue】拉取消息片段有说明。

顺序消费和并发消费

rocketMQ消费模型是单【consumer】实例 + 多【worker】线程模型。
rocketMQ消费模型

RocketMQ会为每个队列分配⼀个PullRequest,并将其放⼊pullRequestQueuePullMessageService线程会不断轮询从pullRequestQueue中取出PullRequest去拉取消息,拉取到消息后会较消息保存到ProcessQueue中,然后提交消费请求到ConsumeMessageService处理:

public interface ConsumeMessageService {
    // ...
    // 提交消费任务到ConsumeMessageService
    void submitConsumeRequest(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispathToConsume);
}

这里ConsumeMessageService有两种实现:

  • ConsumeMessageConcurrentlyService:并发消费
  • ConsumeMessageOrderlyService:顺序消费

这两种消费服务都维护有自己的ConsumeRequest对象,其继承自Runnable接口,对于两种消费服务而言,提交消费请求实际上就是提交ConsumeRequest给到线程池,那么主要关注ConsumeRequest#run方法即可。

并发消费

public void run() {
    // ...

    MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
    ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
    ConsumeConcurrentlyStatus status = null;
    defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());

    // ...
    long beginTimestamp = System.currentTimeMillis();
    boolean hasException = false;
    ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
    try {
        if (msgs != null && !msgs.isEmpty()) {
            for (MessageExt msg : msgs) {
                MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
            }
        }
        // 核心逻辑:就是调用listner的consumeMessage方法
        status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
    } catch (Throwable e) {
        log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                RemotingHelper.exceptionSimpleDesc(e),
                ConsumeMessageConcurrentlyService.this.consumerGroup,
                msgs,
                messageQueue);
        hasException = true;
    }
    // ...

    // 结果处理
    if (!processQueue.isDropped()) {
        ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
    } else {
        log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
    }
}

并发消费主要就是两个关键步骤:

  1. 执行listener.consumeMessage方法
  2. 结果处理

顺序消费

RocketMQ如何保证顺序消费:

  1. RocketMQ会为每个消息队列建⼀个对象锁,这样只要线程池中有该消息队列在处理,则需等待处理完才能进⾏下⼀次消费,保证在当前【Consumer】内,同⼀队列的消息进⾏串⾏消费。
  2. 向【Broker】端请求锁定当前顺序消费的队列,防⽌在消费过程中被分配给其它消费者处理从⽽打乱消费顺序。

ConsumeMessageOrderlyService在启动的时候,如果是集群模式下会启动⼀个单线程的定时调度任务,延迟⼀秒,时间间隔为20秒,执⾏rebalanceImpllockAll()⽅法。向Master节点的【Broker】发送LOCK_BATCH_MQ请求,内容是锁住该ConsumerId下的所有【MessageQueue】,当【Broker】返回锁住成功的MQs后,Client端会把MQ对应的【ProcessQueue】也锁住。

public void run() {
    // ...

    final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
    synchronized (objLock) {
        // 广播模式 || (PQ 成功锁住 && 锁没过期)
        if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
            final long beginTime = System.currentTimeMillis();
            // 依次消费,直到continueConsume=false,如果使用者处理消息没有错误,一般会返回SUCCESS,这时continueConsume始终是true,如果没有返回SUCCESS,RocketMQ会根据返回状态把continueConsume=false
            for (boolean continueConsume = true; continueConsume; ) {
                // ...

                // ...如果CLUSTERING模式下PQ未被锁住,就重新消费

                // ...如果CLUSTERING模式下PQ锁过期,就重新消费

                // ...如果消费超时的情况下,就先放弃本次提交消费

                // 如果消息数量大于上限,分段消费
                final int consumeBatchSize =
                        ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();

                List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
                defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
                if (!msgs.isEmpty()) {
                    final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);

                    ConsumeOrderlyStatus status = null;

                    ConsumeMessageContext consumeMessageContext = null;
                    // ...

                    long beginTimestamp = System.currentTimeMillis();
                    ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                    boolean hasException = false;
                    try {
                        // 处理队列加锁
                        this.processQueue.getLockConsume().lock();
                        if (this.processQueue.isDropped()) {
                            log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                    this.messageQueue);
                            break;
                        }
                        // 实现消费逻辑
                        status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                    } catch (Throwable e) {
                        log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                                RemotingHelper.exceptionSimpleDesc(e),
                                ConsumeMessageOrderlyService.this.consumerGroup,
                                msgs,
                                messageQueue);
                        hasException = true;
                    } finally {
                        this.processQueue.getLockConsume().unlock();
                    }

                    // ...
        } else {
            // ...没有拿到当前队列的锁,稍后再消费
        }
    }
}

完整消费流程

完整消费流程

正文到此结束