原创

RocketMQ消息类型

普通消息

消息发送

  1. 同步发送
    普通消息同步发送

    SendResult sendResult = producer.send(msg);
    

    这种消息会在发送后返回SendResult,发送方通过SendResult拿到返回结果后再去做下一步处理。常用于重要通知邮件、报名短信通知、营销短信系统等。

  2. 异步发送
    普通消息异步发送

    producer.send(msg, new SendCallback() {
     @Override
     public void onSuccess(SendResult sendResult) {}
    
     @Override
     public void onException(Throwable e) {}
    });
    

    异步发送⼀般⽤于链路耗时较⻓,对响应时间较为敏感的业务场景,例如⽤户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

  3. 单向发送
    普通消息单向发送

    producer.sendOneway(msg);
    

    发送⽅只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此⽅式发送消息的过程耗时⾮常短,⼀般在微秒级别。适⽤于某些耗时⾮常短,但对可靠性要求并不⾼的场景,例如⽇志收集。

三种发送⽅式的对⽐:
| 发送⽅式 | 发送TPS | 发送结果反馈 | 可靠性 |
| --- | --- | --- | --- |
| 同步发送 | 快 | 有 | 不丢失 |
| 异步发送 | 快 | 有 | 不丢失 |
| 单向发送 | 最快 | ⽆ | 可能丢失 |

消息消费

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        // TODO: 处理消息msgs
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

顺序消息

顺序消息(FIFO 消息)是消息队列【RocketMQ】版提供的⼀种严格按照顺序来发布和消费的消息。

顺序发布和顺序消费是指对于指定的⼀个Topic,⽣产者按照⼀定的先后顺序发布消息;消费者按照既定的先后顺序订阅消息,即先发布的消息⼀定会先被客户端接收到。

在MQ的模型中,顺序需要由3个阶段去保障:

  1. 消息被发送时保持顺序
  2. 消息被存储时保持和发送的顺序⼀致
  3. 消息被消费时保持和存储的顺序⼀致

发送时保持顺序意味着对于有顺序要求的消息,⽤户应该在同⼀个线程中采⽤同步的⽅式发送。存储保持和发送的顺序⼀致则要求在同⼀线程中被发送出来的消息A和B,存储时在空间上A⼀定在B之前。⽽消费保持和存储⼀致则要求消息A、B到达Consumer之后必须按照先A后B的顺序被处理。
如下图所示:
顺序消息模型

对于两个订单的消息的原始数据:a1、b1、b2、a2、a3、b3(绝对时间下发⽣的顺序):

  • 在发送时,a订单的消息需要保持a1、a2、a3的顺序,b订单的消息也相同,但是a、b订单之间的消息没有顺序关系,这意味着a、b订单的消息可以在不同的线程中被发送出去
  • 在存储时,需要分别保证a、b订单的消息的顺序,但是a、b订单之间的消息的顺序可以不保证
    • a1、b1、b2、a2、a3、b3是可以接受的
    • a1、a2、b1、b2、a3、b3也是可以接受的
    • a1、a3、b1、b2、a2、b3是不能接受的
  • 消费时保证顺序的简单⽅式就是“什么都不做”,不对收到的消息的顺序进⾏调整,即只要⼀个分区的消息只由⼀个线程处理即可;当然,如果a、b在⼀个分区中,在收到消息后也可以将他们拆分
    到不同线程中处理,不过要权衡⼀下收益

实现

顺序消息分为全局顺序消息和分区顺序消息

  • 全局顺序:对于指定的⼀个【Topic】,所有消息按照严格的先⼊先出(First In First Out,简称FIFO)的顺序进⾏发布和消费。
  • 分区顺序:对于指定的⼀个【Topic】,所有消息根据【Sharding Key】进⾏区块分区。 同⼀个分区内的消息按照严格的FIFO顺序进⾏发布和消费。【Sharding Key】是顺序消息中⽤来区分不同分区的关键字段,和普通消息的Key是完全不同的概念。

顺序消息图例
上图中的分区key就可以使用orderId

样例代码

生产者

public class OrderedProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("example_producer_group_name");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        CountDownLatch latch = new CountDownLatch(2);
        Thread t1 = new Thread(new Operation(producer, 0, latch));
        Thread t2 = new Thread(new Operation(producer, 1, latch));
        t1.start();
        t2.start();
        latch.await();
        producer.shutdown();
    }
}

class Operation implements Runnable {

    private DefaultMQProducer producer;
    private static String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
    private int orderId;
    private CountDownLatch latch;

    public Operation(DefaultMQProducer producer, int orderId, CountDownLatch latch) {
        this.producer = producer;
        this.orderId = orderId;
        this.latch = latch;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                // Create a message instance, specifying topic, tag and message body.
                if (i % 2 == orderId) {
                    Message msg = new Message("TopicOrder", tags[i % tags.length], "KEY" + i,
                            ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            Integer id = (Integer) arg;
                            int index = id % mqs.size();
                            return mqs.get(index);
//                            既可以是不同的队列 也可以是同一个队列
//                            return mqs.get(1);
                        }
                    }, orderId);

                    System.out.printf("%s%n", sendResult);
                }
            }
            latch.countDown();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

通过实现MessageQueueSelector将同一个orderId的消息分配到一个队列上,保证了发送方的顺序性,同时,这样也保证了存储在broker上的数据的顺序性。

消费者

public class OrderedConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_consumer_group_name");
        consumer.setNamesrvAddr("localhost:9876");
//        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

//        consumer.subscribe("TopicOrder", "TagA || TagB || TagC || TagD");
        consumer.subscribe("TopicOrder", "*");

        consumer.registerMessageListener(new MessageListenerOrderly() {

            AtomicLong consumeTimes = new AtomicLong(0);

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(false);
                for (MessageExt me : msgs) {
                    System.out.printf(Thread.currentThread().getName() + " Queue: " + me.getQueueId() + " Receive New Messages: " + new String(me.getBody()) + "%n");
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

MessageListenerOrderlyMessageListenerConcurrently区别?

  • MessageListenerOrderly:有序消费,同⼀队列的消息同⼀时刻只能⼀个线程消费,可保证消息在同⼀队列严格有序消费
  • MessageListenerConcurrently:并发消费

此处需要注意的是,对于顺序消息,如果在消费端消费失败的情况下,顺序消息会不断进行重试,无上限,这样会导致消息消费阻塞,所以需要在消费端进行消费失败的最终处理。

广播消息

这里需要先了解两种消费模式:

  1. 集群消费模式(默认)
  2. 广播消费模式

集群消费模式

适⽤于消费端集群化部署,每条消息只需要被处理⼀次的场景。此外,由于消费进度在服务端维护,可靠性更⾼。具体消费示例如下图所示:
集群消费模式

注意事项:
集群消费模式下,不保证每⼀次失败重投的消息路由到同⼀台机器上。消息虽然会被分配到指定的一个队列上,但是队列与consumer的订阅关系可能发生改变。

广播消费模式

适⽤于消费端集群化部署,每条消息需要被集群下的每个消费者处理的场景。

广播模式下,每个消费者会订阅所有队列,并且在消费者本地自己维护每个队列的offset

注意事项:

  • ⼴播消费模式下不⽀持顺序消息。
  • ⼴播消费模式下不⽀持重置消费位点。
  • 每条消息都需要被相同订阅逻辑的多台机器处理。
  • ⼴播模式下服务端不维护消费进度,消费进度在客户端维护
  • ⼴播模式下,【RocketMQ】保证每条消息⾄少被每台客户端消费⼀次,但是并不会重投消费失败的消息,因此业务⽅需要关注消费失败的情况。
  • ⼴播模式下,客户端每⼀次重启都会从最新消息消费。客户端在被停⽌期间发送⾄服务端的消息将会被⾃动跳过,请谨慎选择。
  • ⼴播模式下,每条消息都会被⼤量的客户端重复处理,因此推荐尽可能使⽤集群模式。

代码实现

广播模式的生产者没有特殊逻辑,唯一不同的是消费者在设置消费模式时需要显式的设置为广播消费模式(默认是集群模式):

consumer.setMessageModel(MessageModel.BROADCASTING);

延时消息

延时消息(延迟消息)⽤于指定消息发送到【RocketMQ】的服务端后,延时⼀段时间才被投递到消费者进⾏消费(例如3秒后才被消费),适⽤于解决⼀些消息⽣产和消费有时间窗⼝要求的场景,或者通过消息触发延迟任务的场景,类似于延迟队列。

场景案例:⽤户下了⼀个订单之后,需要在指定时间内(例如30分钟)进⾏⽀付,在到期之前可以发送⼀个消息提醒⽤户进⾏⽀付。

实现这类需求通常有两种⽅式:

  • 轮询定时任务:给定周期内扫描所有未⽀付的订单,查看时间是否到期。
  • 延时消息:订单创建的时候发送⼀条N分钟到期的信息,⼀旦消息消费后便可判断订单是否可以取消。

第一种方式如果轮训频率较高的话,会导致数据库压力增大;如果频率低的话,则会导致通知不精准。所以使用延时消息是更好的方式。

延时消息过程

⼀些消息中间件的Broker端内置了延迟消息⽀持的能⼒,核⼼实现思路都是⼀样:将延迟消息通过⼀个临时存储进⾏暂存,到期后才投递到⽬标Topic中。

如下图所示:
延时消息演示

  1. producer要将⼀个延迟消息发送到目标【Topic】中
  2. Broker判断这是⼀个延迟消息后,将其通过临时存储进⾏暂存。
  3. Broker内部通过⼀个延迟服务delay service检查消息是否到期,将到期的消息投递到⽬标【Topic】中。这个的延迟服务名字为delay service,不同消息中间件的延迟服务模块名称可能不同。
  4. 消费者消费⽬标【topic】中的延迟投递的消息。

显然,临时存储模块和延迟服务模块,是延迟消息实现的关键。上图中,临时存储和延迟服务都是在Broker内部实现,对业务透明。

对于临时存储一般有几个方面的要求:

  1. ⾼性能
    写⼊延迟要低,MQ的⼀个重要作⽤是削峰填⾕,在选择临时存储时,写⼊性能必须要⾼,关系型数据库通常不满⾜需求。
  2. ⾼可靠
    延迟消息写⼊后,不能丢失,需要进⾏持久化,并进⾏备份。
  3. 支持排序
    对于延迟消息而言是需要按照时间进⾏排序的。普通消息通常先发送的会被先消费,延迟消息与普通消息不同,需要进⾏排序。
    例如先发⼀条延迟10s的消息,再发⼀条延迟5s的消息,那么后发送的消息需要被先消费。
  4. ⽀持⻓时间保存
    ⼀些业务的延迟消息,需要延迟⼏个⽉,甚⾄更⻓,所以延迟消息必须能⻓时间保留。不过通常不建议延迟太⻓时间,存储成本⽐较⼤,且业务逻辑可能已经发⽣变化,已经不需要消费这些消息。

滴滴DDMQ延迟消息实现

目前,延时消息业界较为成熟的有滴滴开源的消息中间件DDMQ,底层消息中间件的基础上加了⼀层代理,独⽴部署延迟服务模块,使⽤rocksdb进⾏临时存储。rocksdb是⼀个⾼性能的KV存储,并⽀持排序。

此时对于延迟消息的流转如下图所示:
滴滴DDMQ延时消息实现

  1. ⽣产者将发送给producer proxy,代理判断是延迟消息,将其投递到⼀个缓冲【Topic】中;
  2. delay service启动消费者,⽤于从缓冲【topic】中消费延迟消息,以时间为key,存储到rocksdb中;
  3. delay service判断消息到期后,将其投递到⽬标【Topic】中。
  4. 消费者消费⽬标【topic】中的数据

这种⽅式的好处是,因为delay service的延迟投递能⼒是独⽴于broker实现的,不需要对broker做任何改造,对于任意MQ类型都可以提供⽀持延迟消息的能⼒,例如DDMQ对RocketMQ、Kafka都提供了秒级精度的延迟消息投递能⼒,RocketMQ虽然⽀持延迟消息,但不⽀持秒级精度。

RocketMQ中的延迟消息

RocketMQ不⽀持任意时间的延时,只⽀持以下⼏个固定的延时等级:

String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

RocketMQ的延迟等级可以进⾏修改,以满⾜⾃⼰的业务需求,可以修改/添加新的level。

实现原理

延迟队列的核⼼思路是:所有的延迟消息由producer发出之后,都会存放到同⼀个topic(SCHEDULE_TOPIC_XXXX)下,根据延迟level的个数,创建对应数量的队列,也就是说18个level对应了18个队列。注意,这并不是说这个内部主题只会有18个队列,因为Broker通常是集群模式部署的,因此每个节点都有18个队列。不同的延迟级别会对应不同的队列序号,当延迟时间到之后,由定时线程读取转换为普通的消息存的真实指定的topic下,此时对于consumer端此消息才可⻅,从⽽被consumer消费。

⽣产者在发送延迟消息⾮常简单,只需要设置⼀个延迟级别即可,注意不是具体的延迟时间,如:

public static void main(String[] args) throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
    producer.setNamesrvAddr("localhost:9876");
    producer.start();
    int totalMessagesToSend = 2;
    for (int i = 0; i < totalMessagesToSend; i++) {
        Message message = new Message("TopicTest", ("Hello scheduled message " + i).getBytes());
        //注意这里的3指的不是3s,而是等级
        message.setDelayTimeLevel(3);
        // Send the message
        producer.send(message);
    }

    producer.shutdown();
}

延迟消息在RocketMQ Broker端的流转如下图所示:
rocketMQ延迟消息图示
可以看到,总共有6个步骤,下⾯会对这6个步骤进⾏详细的讲解:

  1. 修改消息Topic名称和队列信息
  2. 转发消息到延迟主题SCHEDULE_TOPIC_XXXXCosumeQueue
  3. ScheduleMessageService定期消费SCHEDULE_TOPIC_XXXX消息
  4. 将信息重新存储到CommitLog
  5. 将消息投递到⽬标【Topic】中
  6. 消费者消费⽬标【Topic】中的数据
第⼀步:修改消息Topic名称和队列信息

RocketMQ Broker端在存储⽣产者写⼊的消息时,⾸先都会将其写⼊到CommitLog中。之后根据消息中的Topic信息和队列信息,将其转发到⽬标Topic的指定队列ConsumeQueue中。

由于消息⼀旦存储到ConsumeQueue中,消费者就能消费到,⽽延迟消息不能被⽴即消费,所以这⾥将Topic的名称修改为SCHEDULE_TOPIC_XXXX,并根据延迟级别确定要投递到哪个队列下。

同时,还会将消息原来要发送到的⽬标【Topic】和队列信息存储到消息的属性中。
CommitLog#putMessage

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
    ...
        /**
         * 如果是延迟消息
         */
        if (msg.getDelayTimeLevel() > 0) {
            // 如果设置的级别超过了最大级别,重置延迟级别
            if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
            }
            // 修改Topic的投递目标为内部主题SCHEDULE_TOPIC_XXXX
            topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
            // 根据delayLevel,确定将消息投递到SCHEDULE_TOPIC_XXXX内部的哪个队列中
            queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

            // Backup real topic, queueId
            // 记录原始topic, queueId
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
            msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
            // 更新消息投递目标为SCHEDULE_TOPIC_XXXX和queueId
            msg.setTopic(topic);
            msg.setQueueId(queueId);
        }
    ...
}
第⼆步:转发消息到延迟主题的CosumeQueue中

CommitLog中的消息转发到CosumeQueue中是异步进⾏的。在转发过程中,会对延迟消息进⾏特殊处理,主要是计算这条延迟消息需要在什么时候进⾏投递。

投递时间 = 消息存储时间(storeTimestamp) + 延迟级别对应的时间

需要注意的是,会将计算出的投递时间当做消息Tag的哈希值存储到CosumeQueue中,CosumeQueue单个存储单元组成结构如下图所示:
consumeQueue存储单元

  • Commit Log Offset:记录在CommitLog中的位置。
  • Size:记录消息的⼤⼩
  • Message Tag HashCode:记录消息Tag的哈希值,⽤于消息过滤。对于延迟消息,这个字段记录的是消息的投递时间戳。这也是为什么java中hashCode⽅法返回⼀个int型,只占⽤4个字节,⽽这⾥Message Tag HashCode字段确设计成8个字节的原因。
    CommitLog#checkMessageAndReturnSize

    public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC,
                                                   final boolean readBody) {
      ...
      // Timing message processing
      {
          // 如果消息需要投递到延迟主题SCHEDULE_TOPIC_XXX中
          String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
          if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) {
              int delayLevel = Integer.parseInt(t);
    
              if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                  delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
              }
              //如果延迟级别大于0,计算目标投递时间,并将其当做tag哈希值
              if (delayLevel > 0) {
                  tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
                          storeTimestamp);
              }
          }
      }
      ...
    }
    
第三步:delay service消费SCHEDULE_TOPIC_XXXX消息

Broker内部有⼀个ScheduleMessageService类,其充当延迟服务,消费SCHEDULE_TOPIC_XXXX中的消息,并投递到⽬标Topic中。

ScheduleMessageService在启动时,其会创建⼀个定时器Timer,并根据延迟级别的个数,启动对应数量的TimerTask每个TimerTask负责⼀个延迟级别的消费与投递
ScheduleMessageService#start

public void start() {
    if (started.compareAndSet(false, true)) {
        //1 创建定时器Timer
        this.timer = new Timer("ScheduleMessageTimerThread", true);
        //2 针对每个延迟级别,创建一个TimerTask
        //2.1 迭代每个延迟级别:delayLevelTable是一个Map记录了每个延迟级别对应的延迟时间
        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            //2.2 获得每个每个延迟级别的level和对应的延迟时间
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            Long offset = this.offsetTable.get(level);
            if (null == offset) {
                offset = 0L;
            }
            // 2.3 针对每个级别创建一个对应的TimerTask
            if (timeDelay != null) {
                this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
            }
        }
        ...
    }
}

再往下看:
ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup

public void executeOnTimeup() {
    // 获取需要延迟的queue
    ConsumeQueue cq =
            ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel));

    long failScheduleOffset = offset;

    if (cq != null) {
        SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
        if (bufferCQ != null) {
            try {
                long nextOffset = offset;
                int i = 0;
                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                    // commitLogOffset
                    long offsetPy = bufferCQ.getByteBuffer().getLong();
                    // messageSize
                    int sizePy = bufferCQ.getByteBuffer().getInt();
                    // tagsCode
                    long tagsCode = bufferCQ.getByteBuffer().getLong();

                    if (cq.isExtAddr(tagsCode)) {
                        if (cq.getExt(tagsCode, cqExtUnit)) {
                            tagsCode = cqExtUnit.getTagsCode();
                        } else {
                            //can't find ext content.So re compute tags code.
                            log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                                    tagsCode, offsetPy, sizePy);
                            long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
                            tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
                        }
                    }

                    long now = System.currentTimeMillis();
                    // 调整投递时间,如果当前时间+当前等级对应延迟时间<队列中消息的投递时间
                    // 就调整本次投递时间为当前时间
                    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
                    // 这里是在将下一次需要投递的指针向后推,例如:本次投递了一条,那么下一次投递应该从本次投递的offset向后推1
                    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

                    long countdown = deliverTimestamp - now;

                    if (countdown <= 0) {
                        // 当前消息重新投递
                        MessageExt msgExt =
                                ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                        offsetPy, sizePy);

                        if (msgExt != null) {
                            try {
                                MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
                                    log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
                                            msgInner.getTopic(), msgInner);
                                    continue;
                                }
                                // 投递消息
                                PutMessageResult putMessageResult =
                                        ScheduleMessageService.this.writeMessageStore
                                                .putMessage(msgInner);

                                if (putMessageResult != null
                                        && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                                    // 投递成功就再取下一条
                                    continue;
                                } else {
                                    // XXX: warn and notify me
                                    log.error(
                                            "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
                                            msgExt.getTopic(), msgExt.getMsgId());
                                    // 如果消息投递失败直接打印日志并进行下一次定时
                                    ScheduleMessageService.this.timer.schedule(
                                            new DeliverDelayedMessageTimerTask(this.delayLevel,
                                                    nextOffset), DELAY_FOR_A_PERIOD);
                                    ScheduleMessageService.this.updateOffset(this.delayLevel,
                                            nextOffset);
                                    return;
                                }
                            } catch (Exception e) {
                                /*
                                 * XXX: warn and notify me
                                 */
                                log.error(
                                        "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
                                                + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
                                                + offsetPy + ",sizePy=" + sizePy, e);
                            }
                        }
                    } else {
                        // 记录下一次执行时间和下一次执行的consumeQueue的offset和下一次启动的时间countdown
                        // 从这里可以看出,因为消息是使用时间排序的,所以只要有一条时间还没到,后面的就不继续进行了,而是直接到下一次定时开始
                        ScheduleMessageService.this.timer.schedule(
                                new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
                                countdown);
                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                        return;
                    }
                } // end of for

                nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                        this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                return;
            } finally {
                bufferCQ.release();
            }
        }
        else {
            // bufferCQ == null
            long cqMinOffset = cq.getMinOffsetInQueue();
            if (offset < cqMinOffset) {
                // 修改下一次定时启动的offset
                failScheduleOffset = cqMinOffset;
                log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
                        + cqMinOffset + ", queueId=" + cq.getQueueId());
            }
        }
    } // end of if (cq != null)
    // 记录下一次定时从最小offset开始
    ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
            failScheduleOffset), DELAY_FOR_A_WHILE);
}
第四步:将信息重新存储到CommitLog中投递到真实Topic

这里接着上一步,ScheduleMessageService将消息重新投递,执行CommitLog#putMessage方法。此时,消息中的DELAY属性已经去除,相当于一个普通消息,该消息会重新写入Commit Log Mapped File,并dispatch到相应的Consume Queue当中。

第五步:消费者消费⽬标Topic中的数据

延迟消息存放

从刚刚的步骤中我们知道延迟消息在RocketMQ中会保存两次,延迟消息存放的结构如下图所示:
延时消息存储结构

另外,代码中我们可以看到ScheduleMessageService维护了一个offsetTable,该数据也会落地到磁盘中,记录每个级别的延迟消息已经消费到什么位置,该文件存储在~/store/config/delayOffset.json文件内。
格式为:

// key为延时级别
// value为消费的offset
{
    "offsetTable":{
        3:202,
        4:2,
        5:2,
        6:2,
        7:2,
        8:2,
        9:2,
        10:2,
        11:2
 }

批量消息

批量发送可以提⾼发送性能,但有⼀定的限制:

  1. 一次批量消息一定topic都相同
  2. 每条消息的waitStoreMsgOK属性相同,建议都为true
  3. 批量消息不支持延时推送
  4. 批量消息最大一次性发送包大小4M,这个是作为producer一次性发送的最大值,不只是批量消息

根据第4点,我们在发送批量消息时最好加入截断器:

class ListSplitter implements Iterator<List<Message>> {
    private final int SIZE_LIMIT = 1024 * 1024 * 4;
    private final List<Message> messages;
    private int currIndex;
    public ListSplitter(List<Message> messages) {
        this.messages = messages;
    }
    @Override public boolean hasNext() {
        return currIndex < messages.size();
    }
    @Override public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            // 计算消息大小
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20; // for log overhead

            if (tmpSize > SIZE_LIMIT) { // 如果单条消息超过4M
                if (nextIndex - currIndex == 0) {
                    //if the next sublist has no element, add this one and then break, otherwise just break
                    nextIndex++;
                }
                break;
            }
            if (tmpSize + totalSize > SIZE_LIMIT) { // 计算如果超过4M 进行截断
                break;
            } else {
                totalSize += tmpSize;
            }

        }
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}

发送批量消息

List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));

ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
    List<Message>  listItem = splitter.next();
    SendResult sendResult = producer.send(listItem);
}

消息过滤

具体来说,这不是一种消息类型,每一种消息都可以添加消息过滤条件。其应用范围比较广泛,一般在电商业务中经常使用。
消息过滤图例

消息过滤包括基于表达式过滤与基于类模式两种过滤模式。其中表达式过滤⼜分为TAG和SQL92模式。

基于Tag过滤

发送消息时我们会为每⼀条消息设置TAG标签,同⼀⼤类中的消息放在⼀个主题TOPIC下,但是如果进⾏分类我们则可以根据TAG进⾏分类,每⼀类消费者可能不是关系某个主题下的所有消息,我们就可以通过TAG进⾏过滤,订阅关注的某⼀类数据。

producer创建消息时添加TAGA标签:

Message msg = new Message("TopicTest", "TAGA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

consumer消费过程中,只消费自己需要的tag的消息:

consumer.subscribe("TopicTest", "TAGA || TAGB");

基于SQL过滤

SQL92表达式消息过滤,是通过消息的属性运⾏SQL过滤表达式进⾏条件匹配,消息发送时需要设置⽤户的属性,putUserProperty⽅法设置属性。

producer添加用户属性:

Message msg = new Message("TopicTest" /* Topic */, tag /* Tag */, ("RocketMQ消息测试,消息的TAG="+tag+  ", 属性 age = " + i + ", == " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.putUserProperty("age", i+"");

consumer中使用SQL过滤:

consumer.subscribe("TopicTest", MessageSelector.bySql("age between 0 and 6"));

⽀持的语法:

  1. 数值⽐较, 如 > , >= , < , <= , BETWEEN , = ;
  2. 字符⽐较, 如 = , <> , IN ;
  3. IS NULL or IS NOT NULL ;
  4. 逻辑连接符 AND , OR , NOT ;

⽀持的类型:

  1. 数值型, 如123, 3.1415;
  2. 字符型, 如 ‘abc’, 必须⽤单引号;
  3. NULL , 特殊常数;
  4. 布尔值, TRUE or FALSE ;

类过滤模式

该过滤模式在低版本中使用,现已抛弃。通过定义消息过滤类的接⼝实现消息过滤。

事务消息

RocketMQ提供的分布式事务消息适⽤于所有对数据最终⼀致性有强需求的场景。

这里需要注意的是,RocketMQ的事务消息提供的不是完整的分布式事务过程,该过程的完整实现需要业务和RocketMQ事务消息配合完成

概念

  • 事务消息:RocketMQ提供类似X/Open XA的分布式事务功能,通过RocketMQ事务消息能达到分布式事务的最终⼀致。
  • 半事务消息:暂不能投递的消息,发送⽅已经成功地将消息发送到了RocketMQ服务端,从此时到服务端收到⽣产者对该消息的⼆次确认指端时间,该消息被标记成【暂不能投递】状态,处于该种状态下的消息即半事务消息。相当于事务的begin transaction过程
  • 消息回查:由于⽹络闪断、⽣产者应⽤重启等原因,导致某条事务消息的⼆次确认丢失,RocketMQ服务端通过扫描发现某条消息⻓期处于“半事务消息”时,需要主动向消息⽣产者询问该消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查。

流程

事务消息交互流程如下图所示:
事务消息发送接收流程

事务消息发送步骤如下:(对应图中1、2、3、4)

  1. 发送⽅将半事务消息发送⾄RocketMQ服务端。
  2. RocketMQ服务端将消息持久化成功之后,向发送⽅返回Ack确认消息已经发送成功,此时消息为半事务消息。
  3. 发送⽅开始执⾏本地事务逻辑。
  4. 发送⽅根据本地事务执⾏结果向服务端提交⼆次确认(Commit 或是 Rollback),服务端收到Commit状态则将半事务消息标记为可投递,订阅⽅最终将收到该消息;服务端收到Rollback状态则删除半事务消息,订阅⽅将不会接受该消息。

事务消息回查步骤如下:(对应图中5、6、7)

  1. 在断⽹或者是应⽤重启的特殊情况下,上述【步骤4】提交的⼆次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
  2. 发送⽅收到消息回查后,需要检查对应消息的本地事务执⾏的最终结果。
  3. 发送⽅根据检查得到的本地事务的最终状态再次提交⼆次确认,服务端仍按照【步骤4】对半事务消息进⾏操作。

从这个图中我们可以得知,通过RocketMQ形式的分布式事务类型,能保证最终结果的一致性,由于两个服务的控制不是在同一个事务内完成的,所以没有办法做到强一致性。

注意事项

  1. 事务消息不⽀持延时消息和批量消息
  2. 为了避免单个消息被检查太多次⽽导致半队列消息累积,我们默认将单个消息的检查次数限制为【15次】,但是⽤户可以通过Broker配置⽂件的transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话(N = transactionCheckMax),则Broker将丢弃此消息,并在默认情况下同时打印错误⽇志。⽤户可以通过重写AbstractTransactionalMessageCheckListener类来修改这个⾏为。
  3. 事务消息将在Broker配置⽂件中的参数transactionTimeout这样的特定时间⻓度之后被检查。当发送事务消息时,⽤户还可以通过设置⽤户属性CHECK_IMMUNITY_TIME_IN_SECONDS来改
    变这个限制,该参数优先于transactionTimeout参数。
  4. 事务性消息可能不⽌⼀次被检查或消费。做好幂等性的检查
  5. 提交给⽤户的⽬标主题消息可能会失败,⽬前这依⽇志的记录⽽定。它的⾼可⽤性通过RocketMQ本身的⾼可⽤性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使⽤同步的双重写⼊机制。
  6. 因为事务消息有消息回查机制,也就是RocketMQ服务端会反向请求生产者客户端,是通过producer的ID回查,所以事务消息的producerId不能与其他消息的producerId相同。

事务消息原理

事务消息原理

HALF消息:RMQ_SYS_TRANS_HALF_TOPIC(临时存放消息信息)

  1. 事务消息替换主题,保存原主题和队列信息
  2. 半消息对Consumer不可⻅,不会被投递

OP消息:RMQ_SYS_TRANS_OP_HALF_TOPIC(记录⼆阶段操作)

  • Rollback:只做记录
  • Commit:根据备份信息重新构造消息并投递

消息回查:先从RMQ_SYS_TRANS_HALF_TOPIC轮训消息获得超过时间的HALF消息,然后通过producerId回查生产者,生产者发送OP消息

事务消息使用

要使⽤RocketMQ的事务消息,要实现⼀个TransactionListener的接⼝,这个接⼝中有两个⽅法,如
下:

public interface TransactionListener {
    /**
     * HALF消息发送给服务端之后,执行本地的事务
     */
    LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);

    /**
     * 当HALF消息发送个服务端之后,RocketMQ没有收到二次确认消息超过一段时间后,RocketMQ回查producer的事务状态
     */
    LocalTransactionState checkLocalTransaction(final MessageExt msg);
}

RocketMQ的事务消息是基于两阶段提交实现的,也就是说消息有两个状态,preparedcommited

当消息执⾏完send⽅法后,进⼊的prepared状态,进⼊prepared状态以后,就要执⾏executeLocalTransaction⽅法,该方法返回LocalTransactionState,这个状态有3个值:

  • LocalTransactionState.COMMIT_MESSAGE:提交消息,这个消息由prepared状态进⼊到commited状态,消费者可以消费这个消息;
  • LocalTransactionState.ROLLBACK_MESSAGE:回滚,这个消息将被删除,消费者不能消费这个消息;
  • LocalTransactionState.UNKNOW:未知,如果返回这个状态,这个消息既不提交,也不回滚,还是保持prepared状态,⽽最终决定这个消息命运的,是checkLocalTransaction这个⽅法。

executeLocalTransaction⽅法返回UNKNOW以后,RocketMQ会隔⼀段时间调⽤⼀次checkLocalTransaction,如果checkLocalTransaction方法继续返回UNKNOW,那么再过一段时间会再次调用checkLocalTransaction,直到返回COMMIT或者ROLLBACK。如果一直不返回COMMIT或ROLLBACK并且重试超过15次之后,该事务消息将会被抛弃。

每次重试的间隔时间和最大重试次数都是由Broker内配置的:
BrokerConfig

/**
 * The maximum number of times the message was checked, if exceed this value, this message will be discarded.
 */
@ImportantField
private int transactionCheckMax = 15;

/**
 * Transaction message check interval.
 */
@ImportantField
private long transactionCheckInterval = 60 * 1000;
正文到此结束