RocketMQ 顺序消息Demo | Eddie'Blog
RocketMQ 顺序消息Demo

RocketMQ 顺序消息Demo

eddie 254 2021-07-29

目录

代码讲解

准备工作

环境准备

RocketMQ双主双从高可用部署

图片.png

依赖引入

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.2</version>
</dependency>

顺序生产者

package com.example.rocketmq.producer.orderly;

import com.example.rocketmq.constants.Const;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

/**
 * @author eddie.lee
 */
public class Producer {

    public static void main(String[] args) throws MQClientException {

        String groupName = "test_orderly_consumer_name";

        DefaultMQProducer producer = new DefaultMQProducer(groupName);

        producer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);

        producer.start();

        // 标记=1
        forSendMsg(producer, 1);
        // 标记=2
        forSendMsg(producer, 2);
        // 标记=3
        forSendMsg(producer, 3);

        // 关闭
        producer.shutdown();

    }

    private static void forSendMsg(DefaultMQProducer producer, int mark) {

        int num = 5;

        String dateStr = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());

        try {
            for (int i = 0; i < num; i++) {
                // 时间戳
                String body = dateStr + " Hello RocketMQ = " + i;
                // 参数: topic tag message
                Message message = new Message("test_order_topic", "TagA", "KEY" + i, body.getBytes());
                // 发送数据:如果使用顺序消费,则必须自己实现MessageQueueSelector, 保证消息进入同一个队列
                SendResult sendResult = producer.send(message, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                        Integer id = (Integer) o;
                        System.err.println("id: " + id);
                        return list.get(id);
                    }
                    // mark 是队列下标
                }, mark);

                System.err.println(sendResult + ", body:" + body);
            }

        } catch (MQBrokerException | MQClientException | RemotingException | InterruptedException e) {
            e.printStackTrace();
            // 【Bug】Either re-interrupt this method or rethrow the "InterruptedException"
            Thread.currentThread().interrupt();
        }

    }

}

Console

id: 1
SendResult [sendStatus=SEND_OK, msgId=C0A8FF1A0CB018B4AAC29333F96C0000, offsetMsgId=C0A8086C00002A9F0000000000008ABF, messageQueue=MessageQueue [topic=test_order_topic, brokerName=broker-a, queueId=1], queueOffset=0], body:2021-07-29 14:00:56 Hello RocketMQ = 0
id: 1
SendResult [sendStatus=SEND_OK, msgId=C0A8FF1A0CB018B4AAC29333F9840001, offsetMsgId=C0A8086C00002A9F0000000000008B98, messageQueue=MessageQueue [topic=test_order_topic, brokerName=broker-a, queueId=1], queueOffset=1], body:2021-07-29 14:00:56 Hello RocketMQ = 1
id: 1
SendResult [sendStatus=SEND_OK, msgId=C0A8FF1A0CB018B4AAC29333F9890002, offsetMsgId=C0A8086C00002A9F0000000000008C71, messageQueue=MessageQueue [topic=test_order_topic, brokerName=broker-a, queueId=1], queueOffset=2], body:2021-07-29 14:00:56 Hello RocketMQ = 2
id: 1
SendResult [sendStatus=SEND_OK, msgId=C0A8FF1A0CB018B4AAC29333F98E0003, offsetMsgId=C0A8086C00002A9F0000000000008D4A, messageQueue=MessageQueue [topic=test_order_topic, brokerName=broker-a, queueId=1], queueOffset=3], body:2021-07-29 14:00:56 Hello RocketMQ = 3
id: 1
SendResult [sendStatus=SEND_OK, msgId=C0A8FF1A0CB018B4AAC29333F9930004, offsetMsgId=C0A8086C00002A9F0000000000008E23, messageQueue=MessageQueue [topic=test_order_topic, brokerName=broker-a, queueId=1], queueOffset=4], body:2021-07-29 14:00:56 Hello RocketMQ = 4
id: 2
SendResult [sendStatus=SEND_OK, msgId=C0A8FF1A0CB018B4AAC29333F9990005, offsetMsgId=C0A8086C00002A9F0000000000008EFC, messageQueue=MessageQueue [topic=test_order_topic, brokerName=broker-a, queueId=2], queueOffset=0], body:2021-07-29 14:00:56 Hello RocketMQ = 0
id: 2
SendResult [sendStatus=SEND_OK, msgId=C0A8FF1A0CB018B4AAC29333F99E0006, offsetMsgId=C0A8086C00002A9F0000000000008FD5, messageQueue=MessageQueue [topic=test_order_topic, brokerName=broker-a, queueId=2], queueOffset=1], body:2021-07-29 14:00:56 Hello RocketMQ = 1
id: 2
SendResult [sendStatus=SEND_OK, msgId=C0A8FF1A0CB018B4AAC29333F9A20007, offsetMsgId=C0A8086C00002A9F00000000000090AE, messageQueue=MessageQueue [topic=test_order_topic, brokerName=broker-a, queueId=2], queueOffset=2], body:2021-07-29 14:00:56 Hello RocketMQ = 2
id: 2
SendResult [sendStatus=SEND_OK, msgId=C0A8FF1A0CB018B4AAC29333F9A70008, offsetMsgId=C0A8086C00002A9F0000000000009187, messageQueue=MessageQueue [topic=test_order_topic, brokerName=broker-a, queueId=2], queueOffset=3], body:2021-07-29 14:00:56 Hello RocketMQ = 3
id: 2
SendResult [sendStatus=SEND_OK, msgId=C0A8FF1A0CB018B4AAC29333F9AD0009, offsetMsgId=C0A8086C00002A9F0000000000009260, messageQueue=MessageQueue [topic=test_order_topic, brokerName=broker-a, queueId=2], queueOffset=4], body:2021-07-29 14:00:56 Hello RocketMQ = 4
id: 3
SendResult [sendStatus=SEND_OK, msgId=C0A8FF1A0CB018B4AAC29333F9B4000A, offsetMsgId=C0A8086C00002A9F0000000000009339, messageQueue=MessageQueue [topic=test_order_topic, brokerName=broker-a, queueId=3], queueOffset=0], body:2021-07-29 14:00:56 Hello RocketMQ = 0
id: 3
SendResult [sendStatus=SEND_OK, msgId=C0A8FF1A0CB018B4AAC29333F9B8000B, offsetMsgId=C0A8086C00002A9F0000000000009412, messageQueue=MessageQueue [topic=test_order_topic, brokerName=broker-a, queueId=3], queueOffset=1], body:2021-07-29 14:00:56 Hello RocketMQ = 1
id: 3
SendResult [sendStatus=SEND_OK, msgId=C0A8FF1A0CB018B4AAC29333F9BF000C, offsetMsgId=C0A8086C00002A9F00000000000094EB, messageQueue=MessageQueue [topic=test_order_topic, brokerName=broker-a, queueId=3], queueOffset=2], body:2021-07-29 14:00:56 Hello RocketMQ = 2
id: 3
SendResult [sendStatus=SEND_OK, msgId=C0A8FF1A0CB018B4AAC29333F9C6000D, offsetMsgId=C0A8086C00002A9F00000000000095C4, messageQueue=MessageQueue [topic=test_order_topic, brokerName=broker-a, queueId=3], queueOffset=3], body:2021-07-29 14:00:56 Hello RocketMQ = 3
id: 3
SendResult [sendStatus=SEND_OK, msgId=C0A8FF1A0CB018B4AAC29333F9CC000E, offsetMsgId=C0A8086C00002A9F000000000000969D, messageQueue=MessageQueue [topic=test_order_topic, brokerName=broker-a, queueId=3], queueOffset=4], body:2021-07-29 14:00:56 Hello RocketMQ = 4
14:00:57.159 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.8.240:10911] result: true
14:00:57.163 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.8.246:10911] result: true
14:00:57.163 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.8.247:10911] result: true
14:00:57.163 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.8.108:10909] result: true
14:00:57.163 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.8.108:10911] result: true
14:00:57.163 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.8.240:9876] result: true
Disconnected from the target VM, address: '127.0.0.1:52919', transport: 'socket'

Process finished with exit code 0

顺序消费者

package com.example.rocketmq.producer.orderly;

import com.example.rocketmq.constants.Const;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * @author eddie.lee
 */
public class Consumer {

    public static void main(String[] args) throws MQClientException {

        Consumer c1 = new Consumer();

    }

    public Consumer() throws MQClientException {

        String groupName = "test_orderly_consumer_name";

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
        consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);

        /**
         * 设置 consumer 第一次启动是从队列头部开始消费还是队列尾巴开始消费<br>
         * 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        // 订阅的主题,以及过滤标签内容
        consumer.subscribe("test_order_topic", "TagA");
        // 注册监听
        consumer.registerMessageListener(new Listener());
        consumer.start();
        System.err.println("Consumer Started...");

    }

    /**
     * MessageListenerOrderly: 一个线程监听一个队列
     */
    class Listener implements MessageListenerOrderly {

        private Random random = new Random();

        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
            // 设置自动提交
//            consumeOrderlyContext.setAutoCommit(true);

            for (MessageExt me : list) {
                System.err.println(me + ", content: " + new String(me.getBody()));
                try {
                    // 模拟业务逻辑处理...
                    TimeUnit.SECONDS.sleep(random.nextInt(4) + 1);
                } catch (Exception e) {
                    e.printStackTrace();
//                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }
            return ConsumeOrderlyStatus.SUCCESS;
        }
    }
}

Console

Consumer Started...
MessageExt [queueId=1, storeSize=217, queueOffset=0, sysFlag=0, bornTimestamp=1627538456941, bornHost=/192.168.8.41:52927, storeTimestamp=1627538458496, storeHost=/192.168.8.108:10911, msgId=C0A8086C00002A9F0000000000008ABF, commitLogOffset=35519, bodyCRC=138404118, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='test_order_topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, KEYS=KEY0, UNIQ_KEY=C0A8FF1A0CB018B4AAC29333F96C0000, WAIT=true, TAGS=TagA}, body=[50, 48, 50, 49, 45, 48, 55, 45, 50, 57, 32, 49, 52, 58, 48, 48, 58, 53, 54, 32, 72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 61, 32, 48], transactionId='null'}], content: 2021-07-29 14:00:56 Hello RocketMQ = 0
MessageExt [queueId=2, storeSize=217, queueOffset=0, sysFlag=0, bornTimestamp=1627538456985, bornHost=/192.168.8.41:52927, storeTimestamp=1627538458524, storeHost=/192.168.8.108:10911, msgId=C0A8086C00002A9F0000000000008EFC, commitLogOffset=36604, bodyCRC=138404118, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='test_order_topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, KEYS=KEY0, UNIQ_KEY=C0A8FF1A0CB018B4AAC29333F9990005, WAIT=true, TAGS=TagA}, body=[50, 48, 50, 49, 45, 48, 55, 45, 50, 57, 32, 49, 52, 58, 48, 48, 58, 53, 54, 32, 72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 61, 32, 48], transactionId='null'}], content: 2021-07-29 14:00:56 Hello RocketMQ = 0
MessageExt [queueId=3, storeSize=217, queueOffset=0, sysFlag=0, bornTimestamp=1627538457012, bornHost=/192.168.8.41:52927, storeTimestamp=1627538458549, storeHost=/192.168.8.108:10911, msgId=C0A8086C00002A9F0000000000009339, commitLogOffset=37689, bodyCRC=138404118, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='test_order_topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, KEYS=KEY0, UNIQ_KEY=C0A8FF1A0CB018B4AAC29333F9B4000A, WAIT=true, TAGS=TagA}, body=[50, 48, 50, 49, 45, 48, 55, 45, 50, 57, 32, 49, 52, 58, 48, 48, 58, 53, 54, 32, 72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 61, 32, 48], transactionId='null'}], content: 2021-07-29 14:00:56 Hello RocketMQ = 0
MessageExt [queueId=1, storeSize=217, queueOffset=1, sysFlag=0, bornTimestamp=1627538456964, bornHost=/192.168.8.41:52927, storeTimestamp=1627538458502, storeHost=/192.168.8.108:10911, msgId=C0A8086C00002A9F0000000000008B98, commitLogOffset=35736, bodyCRC=2134430080, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='test_order_topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, KEYS=KEY1, UNIQ_KEY=C0A8FF1A0CB018B4AAC29333F9840001, WAIT=true, TAGS=TagA}, body=[50, 48, 50, 49, 45, 48, 55, 45, 50, 57, 32, 49, 52, 58, 48, 48, 58, 53, 54, 32, 72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 61, 32, 49], transactionId='null'}], content: 2021-07-29 14:00:56 Hello RocketMQ = 1
MessageExt [queueId=2, storeSize=217, queueOffset=1, sysFlag=0, bornTimestamp=1627538456990, bornHost=/192.168.8.41:52927, storeTimestamp=1627538458527, storeHost=/192.168.8.108:10911, msgId=C0A8086C00002A9F0000000000008FD5, commitLogOffset=36821, bodyCRC=2134430080, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='test_order_topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, KEYS=KEY1, UNIQ_KEY=C0A8FF1A0CB018B4AAC29333F99E0006, WAIT=true, TAGS=TagA}, body=[50, 48, 50, 49, 45, 48, 55, 45, 50, 57, 32, 49, 52, 58, 48, 48, 58, 53, 54, 32, 72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 61, 32, 49], transactionId='null'}], content: 2021-07-29 14:00:56 Hello RocketMQ = 1
MessageExt [queueId=2, storeSize=217, queueOffset=2, sysFlag=0, bornTimestamp=1627538456994, bornHost=/192.168.8.41:52927, storeTimestamp=1627538458532, storeHost=/192.168.8.108:10911, msgId=C0A8086C00002A9F00000000000090AE, commitLogOffset=37038, bodyCRC=1714520122, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='test_order_topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, KEYS=KEY2, UNIQ_KEY=C0A8FF1A0CB018B4AAC29333F9A20007, WAIT=true, TAGS=TagA}, body=[50, 48, 50, 49, 45, 48, 55, 45, 50, 57, 32, 49, 52, 58, 48, 48, 58, 53, 54, 32, 72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 61, 32, 50], transactionId='null'}], content: 2021-07-29 14:00:56 Hello RocketMQ = 2
MessageExt [queueId=3, storeSize=217, queueOffset=1, sysFlag=0, bornTimestamp=1627538457016, bornHost=/192.168.8.41:52927, storeTimestamp=1627538458556, storeHost=/192.168.8.108:10911, msgId=C0A8086C00002A9F0000000000009412, commitLogOffset=37906, bodyCRC=2134430080, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='test_order_topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, KEYS=KEY1, UNIQ_KEY=C0A8FF1A0CB018B4AAC29333F9B8000B, WAIT=true, TAGS=TagA}, body=[50, 48, 50, 49, 45, 48, 55, 45, 50, 57, 32, 49, 52, 58, 48, 48, 58, 53, 54, 32, 72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 61, 32, 49], transactionId='null'}], content: 2021-07-29 14:00:56 Hello RocketMQ = 1
MessageExt [queueId=1, storeSize=217, queueOffset=2, sysFlag=0, bornTimestamp=1627538456969, bornHost=/192.168.8.41:52927, storeTimestamp=1627538458507, storeHost=/192.168.8.108:10911, msgId=C0A8086C00002A9F0000000000008C71, commitLogOffset=35953, bodyCRC=1714520122, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='test_order_topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, KEYS=KEY2, UNIQ_KEY=C0A8FF1A0CB018B4AAC29333F9890002, WAIT=true, TAGS=TagA}, body=[50, 48, 50, 49, 45, 48, 55, 45, 50, 57, 32, 49, 52, 58, 48, 48, 58, 53, 54, 32, 72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 61, 32, 50], transactionId='null'}], content: 2021-07-29 14:00:56 Hello RocketMQ = 2
MessageExt [queueId=3, storeSize=217, queueOffset=2, sysFlag=0, bornTimestamp=1627538457023, bornHost=/192.168.8.41:52927, storeTimestamp=1627538458561, storeHost=/192.168.8.108:10911, msgId=C0A8086C00002A9F00000000000094EB, commitLogOffset=38123, bodyCRC=1714520122, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='test_order_topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, KEYS=KEY2, UNIQ_KEY=C0A8FF1A0CB018B4AAC29333F9BF000C, WAIT=true, TAGS=TagA}, body=[50, 48, 50, 49, 45, 48, 55, 45, 50, 57, 32, 49, 52, 58, 48, 48, 58, 53, 54, 32, 72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 61, 32, 50], transactionId='null'}], content: 2021-07-29 14:00:56 Hello RocketMQ = 2
MessageExt [queueId=1, storeSize=217, queueOffset=3, sysFlag=0, bornTimestamp=1627538456974, bornHost=/192.168.8.41:52927, storeTimestamp=1627538458513, storeHost=/192.168.8.108:10911, msgId=C0A8086C00002A9F0000000000008D4A, commitLogOffset=36170, bodyCRC=288796844, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='test_order_topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=4, KEYS=KEY3, UNIQ_KEY=C0A8FF1A0CB018B4AAC29333F98E0003, WAIT=true, TAGS=TagA}, body=[50, 48, 50, 49, 45, 48, 55, 45, 50, 57, 32, 49, 52, 58, 48, 48, 58, 53, 54, 32, 72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 61, 32, 51], transactionId='null'}], content: 2021-07-29 14:00:56 Hello RocketMQ = 3
MessageExt [queueId=2, storeSize=217, queueOffset=3, sysFlag=0, bornTimestamp=1627538456999, bornHost=/192.168.8.41:52927, storeTimestamp=1627538458537, storeHost=/192.168.8.108:10911, msgId=C0A8086C00002A9F0000000000009187, commitLogOffset=37255, bodyCRC=288796844, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='test_order_topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=4, KEYS=KEY3, UNIQ_KEY=C0A8FF1A0CB018B4AAC29333F9A70008, WAIT=true, TAGS=TagA}, body=[50, 48, 50, 49, 45, 48, 55, 45, 50, 57, 32, 49, 52, 58, 48, 48, 58, 53, 54, 32, 72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 61, 32, 51], transactionId='null'}], content: 2021-07-29 14:00:56 Hello RocketMQ = 3
MessageExt [queueId=1, storeSize=217, queueOffset=4, sysFlag=0, bornTimestamp=1627538456979, bornHost=/192.168.8.41:52927, storeTimestamp=1627538458517, storeHost=/192.168.8.108:10911, msgId=C0A8086C00002A9F0000000000008E23, commitLogOffset=36387, bodyCRC=257041679, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='test_order_topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=5, KEYS=KEY4, UNIQ_KEY=C0A8FF1A0CB018B4AAC29333F9930004, WAIT=true, TAGS=TagA}, body=[50, 48, 50, 49, 45, 48, 55, 45, 50, 57, 32, 49, 52, 58, 48, 48, 58, 53, 54, 32, 72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 61, 32, 52], transactionId='null'}], content: 2021-07-29 14:00:56 Hello RocketMQ = 4
MessageExt [queueId=2, storeSize=217, queueOffset=4, sysFlag=0, bornTimestamp=1627538457005, bornHost=/192.168.8.41:52927, storeTimestamp=1627538458543, storeHost=/192.168.8.108:10911, msgId=C0A8086C00002A9F0000000000009260, commitLogOffset=37472, bodyCRC=257041679, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='test_order_topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=5, KEYS=KEY4, UNIQ_KEY=C0A8FF1A0CB018B4AAC29333F9AD0009, WAIT=true, TAGS=TagA}, body=[50, 48, 50, 49, 45, 48, 55, 45, 50, 57, 32, 49, 52, 58, 48, 48, 58, 53, 54, 32, 72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 61, 32, 52], transactionId='null'}], content: 2021-07-29 14:00:56 Hello RocketMQ = 4
MessageExt [queueId=3, storeSize=217, queueOffset=3, sysFlag=0, bornTimestamp=1627538457030, bornHost=/192.168.8.41:52927, storeTimestamp=1627538458568, storeHost=/192.168.8.108:10911, msgId=C0A8086C00002A9F00000000000095C4, commitLogOffset=38340, bodyCRC=288796844, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='test_order_topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=4, KEYS=KEY3, UNIQ_KEY=C0A8FF1A0CB018B4AAC29333F9C6000D, WAIT=true, TAGS=TagA}, body=[50, 48, 50, 49, 45, 48, 55, 45, 50, 57, 32, 49, 52, 58, 48, 48, 58, 53, 54, 32, 72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 61, 32, 51], transactionId='null'}], content: 2021-07-29 14:00:56 Hello RocketMQ = 3
MessageExt [queueId=3, storeSize=217, queueOffset=4, sysFlag=0, bornTimestamp=1627538457036, bornHost=/192.168.8.41:52927, storeTimestamp=1627538458574, storeHost=/192.168.8.108:10911, msgId=C0A8086C00002A9F000000000000969D, commitLogOffset=38557, bodyCRC=257041679, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='test_order_topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=5, KEYS=KEY4, UNIQ_KEY=C0A8FF1A0CB018B4AAC29333F9CC000E, WAIT=true, TAGS=TagA}, body=[50, 48, 50, 49, 45, 48, 55, 45, 50, 57, 32, 49, 52, 58, 48, 48, 58, 53, 54, 32, 72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 61, 32, 52], transactionId='null'}], content: 2021-07-29 14:00:56 Hello RocketMQ = 4

# RocketMQ