分布式事务 - Rocketmq | Eddie'Blog
分布式事务 - Rocketmq

分布式事务 - Rocketmq

eddie 554 2020-11-30

目录

准备工作

基础代码

MQ

  • 原理、流程与本地消息表
  • 不同点
    • 本地消息表改为MQ
    • 定时任务改为MQ的消费者
  • 不依赖定时任务, 基于MQ更高效、更可靠
  • 适合于公司内的系统
  • 不同公司之间无法基于MQ, 本地消息表更合适

DB1 --> 支付系统 --> 支付成功消息 --> 消费者 --> DB2 (订单更新为已支付)

RocketMQ的安装

http://rocketmq.apache.org/docs/quick-start/

unzip rocketmq-all-4.7.1-source-release.zip
mv rocketmq-all-4.7.1-source-release rocketmq-all-4.7.1
cd rocketmq-all-4.7.1/ && mvn -Prelease-all -DskipTests clean install -U
cd distribution/target/rocketmq-4.7.1/rocketmq-4.7.1

Linux

Server

nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log

Broker

nohup sh bin/mqbroker -n 192.168.8.246:9876 &
tail -f ~/logs/rocketmqlogs/broker.log 

问题汇总:

一、内存问题
[eddie@tracker rocketmq-4.7.1]# sh bin/mqbroker -n localhost:9876
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000005c0000000, 8589934592, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 8589934592 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /opt/rocketmq-all-4.7.1/distribution/target/rocketmq-4.7.1/rocketmq-4.7.1/hs_err_pid22425.log


修改内存
vim /opt/rocketmq-all-4.7.1/distribution/target/rocketmq-4.7.1/rocketmq-4.7.1/bin/runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx256m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

vim /opt/rocketmq-all-4.7.1/distribution/target/rocketmq-4.7.1/rocketmq-4.7.1/bin/runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn256m"

二、SpringBoot启动提示"close the connection to remote address[] result: true" <br> 和 The broker[tracker, 172.19.0.1:10911] boot success. serializeType=JSON and name server is localhost:9876  综合两个提示,明显没有找到IP地址

开始我是用两台服务器分别安装 mqbroker 和 namesrv 后来只能安装在一台,在使用下面方法解决:

echo 'brokerIP1=192.168.8.246' > conf/broker.properties
nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.properties autoCreateTopicEnable=true &



代码部分

Maven依赖

<dependency>
	<groupId>org.apache.rocketmq</groupId>
	<artifactId>rocketmq-client</artifactId>
	<version>4.7.1</version>  <!-- Linux安装RocketMQ版本一致 -->
</dependency>

配置类

package com.example.tccdemo.config;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author eddie.lee
 * @ProjectName tcc-demo
 * @Package com.example.tccdemo.config
 * @ClassName RocketMQConfig
 * @description http://rocketmq.apache.org/docs/simple-example/
 * @date created in 2020-11-30 14:14
 * @modified by
 */
@Configuration
public class RocketMQConfig {

    /**
     * 生产者
     * Send Messages Synchronously 同步发送消息
     *
     * @return producer
     */
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public DefaultMQProducer producer() {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("paymentGroup");
        // Specify name server addresses.
        producer.setNamesrvAddr("192.168.8.246:9876");
        return producer;
    }

    /**
     * 消费者
     *
     * @param messageListener
     * @return consumer
     * @throws MQClientException
     * @description @Qualifier("messageListener") 之后需要单独写一个Bean注入
     */
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public DefaultMQPushConsumer consumer(@Qualifier("messageListener") MessageListenerConcurrently messageListener) throws MQClientException {
        // Instantiate with specified consumer group name.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("paymentConsumerGroup");
        // Specify name server addresses.
        consumer.setNamesrvAddr("192.168.8.246:9876");
        // Subscribe one more more topics to consume.
        consumer.subscribe("payment", "*");
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener(messageListener);
        return consumer;
    }
}

支付消息队列

com.example.tccdemo.service.PaymenService

新增方法

    /**
     * 支付接口 (消息队列:生产者)
     *
     * @param userId
     * @param orderId
     * @param amount
     * @return 0:代表成功、1: 用户不存在、2: 余额不足
     */
    @Transactional(transactionManager = "tm246", rollbackFor = Exception.class)
    public int pamentMQ(int userId, int orderId, BigDecimal amount) throws Exception {
        /**
         * 账户的扣减操作
         */
        // 账户是否存在
        AccountA accountA = accountAMapper.selectByPrimaryKey(userId);
        if (accountA == null) {
            return 1;
        }
        // 账户的余额小于你扣减的金额
        if (accountA.getBalance().compareTo(amount) < 0) {
            return 2;
        }
        // 账户余额减去传入的金额
        accountA.setBalance(accountA.getBalance().subtract(amount));
        accountAMapper.updateByPrimaryKey(accountA);

        Message message = new Message();
        // 必须项:分类
        message.setTopic("payment");
        message.setKeys(orderId + "");
        message.setBody("订单已支付".getBytes());

        try {
            SendResult result = producer.send(message);
            if (result.getSendStatus() == SendStatus.SEND_OK) {
                return 0;
            } else {
                // 如果状态不是 SEND_OK 就直接抛出异常, 回滚整个方法. 保证原子性
                throw new Exception("消息发送失败!");
            }

        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            e.printStackTrace();
            // 必需抛出, 不能让方法正常结束, 不然就不会触发事务的回滚
            throw e;
        }
    }

@Qualifier("messageListener") 单独写一个Bean注入

package com.example.tccdemo.consumer;

import com.example.tccdemo.db247.dao.OrderMapper;
import com.example.tccdemo.db247.model.Order;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Date;
import java.util.List;

import static org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
import static org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus.RECONSUME_LATER;

/**
 * @author eddie.lee
 * @ProjectName tcc-demo
 * @Package com.example.tccdemo.consumer
 * @ClassName ChangeOrderStatus
 * @description
 * @date created in 2020-11-30 15:14
 * @modified by
 */
@Component("messageListener")
public class ChangeOrderStatus implements MessageListenerConcurrently {

    @Resource
    private OrderMapper orderMapper;

    /**
     * @param list                       取得多个消息
     * @param consumeConcurrentlyContext
     * @return
     */
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        if (list == null || list.size() == 0) {
            // 返回消费成功
            return CONSUME_SUCCESS;
        }
        /**
         * 取到消费是一个集合
         * 比如:获取了两条消息队列的消息,
         *      第一个消息消费成功
         *      第二个消息消费失败, 返回再次消费(RECONSUME_LATER)
         *      但是第一个消息是成功的
         *
         *      MessageListenerConcurrently: 不建议抛出异常,而不是在以下情况下返回ConsumeConcurrentlyStatus.RECONSUME_LATER
         *                                    DefaultMQPushConsumer.consumeMessageBatchMaxSize = 1,您可以在此处修改
         *
         *      按 MessageListenerConcurrently 意思,就是每次取一条,所以不用担心第二条失败而导致第一条出现问题
         */
        for (MessageExt item : list) {
            String orderId = item.getKeys();
            byte[] body = item.getBody();
            String msg = new String(body);
            System.out.println("msg=" + msg);

            Order order = orderMapper.selectByPrimaryKey(Integer.parseInt(orderId));
            if (order == null) {
                // 返回再次消费
                return RECONSUME_LATER;
            }
            try {
                // 已支付
                order.setOrderStatus(1);
                order.setUpdateTime(new Date());
                // 更新操作时间
                order.setUpdateUser(0);
                orderMapper.updateByPrimaryKey(order);
            } catch (Exception e) {
                e.printStackTrace();
                return RECONSUME_LATER;
            }
        }
        // 返回消费成功
        return CONSUME_SUCCESS;
    }

}

测试

控制层

    @RequestMapping("/payment")
    private String payment(int userId, int orderId, BigDecimal amount) throws Exception {
        int result = paymenService.pamentMQ(userId, orderId, amount);
        return "支付结果:" + result;
    }
GET

http://localhost:8080/payment?userId=1&orderId=100&amount=200

数据库变化

account_a 扣减金额

t_order 状态更改=1

总结

  • 请求接口
    • com.example.tccdemo.controller.PaymenController#payment
  • 生产者部分
    • 支付接口:com.example.tccdemo.service.PaymenService#pamentMQ
      • 扣减操作
      • 订单号和主题消息“订单已支付”,放入消息队列 (主要部分是MQ出错的逻辑判断)
  • 消费者部分
    • com.example.tccdemo.config.RocketMQConfig#consumer -- @Qualifier("messageListener")
    • 编写 messageListener 的 @Bean -- com.example.tccdemo.consumer.ChangeOrderStatus#consumeMessage
      • List中获取订单号(OrderId)、消息(msg=订单已支付)
      • 通过获取到OrderId查询订单,修改支付状态 (关键点:ConsumeConcurrentlyStatus的枚举状态)