目录
准备工作
MQ
- 原理、流程与本地消息表
- 不同点
- 本地消息表改为MQ
- 定时任务改为MQ的消费者
- 不依赖定时任务, 基于MQ更高效、更可靠
- 适合于公司内的系统
- 不同公司之间无法基于MQ, 本地消息表更合适
DB1 --> 支付系统 --> 支付成功消息 --> 消费者 --> DB2 (订单更新为已支付)
RocketMQ的安装
http://rocketmq.apache.org/docs/quick-start/
unzip rocketmq-all-4.7.1-source-release.zip
mv rocketmq-all-4.7.1-source-release rocketmq-all-4.7.1
cd rocketmq-all-4.7.1/ && mvn -Prelease-all -DskipTests clean install -U
cd distribution/target/rocketmq-4.7.1/rocketmq-4.7.1
Linux
Server
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
Broker
nohup sh bin/mqbroker -n 192.168.8.246:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
问题汇总:
一、内存问题
[eddie@tracker rocketmq-4.7.1]# sh bin/mqbroker -n localhost:9876
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000005c0000000, 8589934592, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 8589934592 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /opt/rocketmq-all-4.7.1/distribution/target/rocketmq-4.7.1/rocketmq-4.7.1/hs_err_pid22425.log
修改内存
vim /opt/rocketmq-all-4.7.1/distribution/target/rocketmq-4.7.1/rocketmq-4.7.1/bin/runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx256m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
vim /opt/rocketmq-all-4.7.1/distribution/target/rocketmq-4.7.1/rocketmq-4.7.1/bin/runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn256m"
二、SpringBoot启动提示"close the connection to remote address[] result: true" <br> 和 The broker[tracker, 172.19.0.1:10911] boot success. serializeType=JSON and name server is localhost:9876 综合两个提示,明显没有找到IP地址
开始我是用两台服务器分别安装 mqbroker 和 namesrv 后来只能安装在一台,在使用下面方法解决:
echo 'brokerIP1=192.168.8.246' > conf/broker.properties
nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.properties autoCreateTopicEnable=true &
代码部分
Maven依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version> <!-- Linux安装RocketMQ版本一致 -->
</dependency>
配置类
package com.example.tccdemo.config;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author eddie.lee
* @ProjectName tcc-demo
* @Package com.example.tccdemo.config
* @ClassName RocketMQConfig
* @description http://rocketmq.apache.org/docs/simple-example/
* @date created in 2020-11-30 14:14
* @modified by
*/
@Configuration
public class RocketMQConfig {
/**
* 生产者
* Send Messages Synchronously 同步发送消息
*
* @return producer
*/
@Bean(initMethod = "start", destroyMethod = "shutdown")
public DefaultMQProducer producer() {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("paymentGroup");
// Specify name server addresses.
producer.setNamesrvAddr("192.168.8.246:9876");
return producer;
}
/**
* 消费者
*
* @param messageListener
* @return consumer
* @throws MQClientException
* @description @Qualifier("messageListener") 之后需要单独写一个Bean注入
*/
@Bean(initMethod = "start", destroyMethod = "shutdown")
public DefaultMQPushConsumer consumer(@Qualifier("messageListener") MessageListenerConcurrently messageListener) throws MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("paymentConsumerGroup");
// Specify name server addresses.
consumer.setNamesrvAddr("192.168.8.246:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("payment", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(messageListener);
return consumer;
}
}
支付消息队列
com.example.tccdemo.service.PaymenService
新增方法
/**
* 支付接口 (消息队列:生产者)
*
* @param userId
* @param orderId
* @param amount
* @return 0:代表成功、1: 用户不存在、2: 余额不足
*/
@Transactional(transactionManager = "tm246", rollbackFor = Exception.class)
public int pamentMQ(int userId, int orderId, BigDecimal amount) throws Exception {
/**
* 账户的扣减操作
*/
// 账户是否存在
AccountA accountA = accountAMapper.selectByPrimaryKey(userId);
if (accountA == null) {
return 1;
}
// 账户的余额小于你扣减的金额
if (accountA.getBalance().compareTo(amount) < 0) {
return 2;
}
// 账户余额减去传入的金额
accountA.setBalance(accountA.getBalance().subtract(amount));
accountAMapper.updateByPrimaryKey(accountA);
Message message = new Message();
// 必须项:分类
message.setTopic("payment");
message.setKeys(orderId + "");
message.setBody("订单已支付".getBytes());
try {
SendResult result = producer.send(message);
if (result.getSendStatus() == SendStatus.SEND_OK) {
return 0;
} else {
// 如果状态不是 SEND_OK 就直接抛出异常, 回滚整个方法. 保证原子性
throw new Exception("消息发送失败!");
}
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
// 必需抛出, 不能让方法正常结束, 不然就不会触发事务的回滚
throw e;
}
}
@Qualifier("messageListener") 单独写一个Bean注入
package com.example.tccdemo.consumer;
import com.example.tccdemo.db247.dao.OrderMapper;
import com.example.tccdemo.db247.model.Order;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
import static org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
import static org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus.RECONSUME_LATER;
/**
* @author eddie.lee
* @ProjectName tcc-demo
* @Package com.example.tccdemo.consumer
* @ClassName ChangeOrderStatus
* @description
* @date created in 2020-11-30 15:14
* @modified by
*/
@Component("messageListener")
public class ChangeOrderStatus implements MessageListenerConcurrently {
@Resource
private OrderMapper orderMapper;
/**
* @param list 取得多个消息
* @param consumeConcurrentlyContext
* @return
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
if (list == null || list.size() == 0) {
// 返回消费成功
return CONSUME_SUCCESS;
}
/**
* 取到消费是一个集合
* 比如:获取了两条消息队列的消息,
* 第一个消息消费成功
* 第二个消息消费失败, 返回再次消费(RECONSUME_LATER)
* 但是第一个消息是成功的
*
* MessageListenerConcurrently: 不建议抛出异常,而不是在以下情况下返回ConsumeConcurrentlyStatus.RECONSUME_LATER
* DefaultMQPushConsumer.consumeMessageBatchMaxSize = 1,您可以在此处修改
*
* 按 MessageListenerConcurrently 意思,就是每次取一条,所以不用担心第二条失败而导致第一条出现问题
*/
for (MessageExt item : list) {
String orderId = item.getKeys();
byte[] body = item.getBody();
String msg = new String(body);
System.out.println("msg=" + msg);
Order order = orderMapper.selectByPrimaryKey(Integer.parseInt(orderId));
if (order == null) {
// 返回再次消费
return RECONSUME_LATER;
}
try {
// 已支付
order.setOrderStatus(1);
order.setUpdateTime(new Date());
// 更新操作时间
order.setUpdateUser(0);
orderMapper.updateByPrimaryKey(order);
} catch (Exception e) {
e.printStackTrace();
return RECONSUME_LATER;
}
}
// 返回消费成功
return CONSUME_SUCCESS;
}
}
测试
控制层
@RequestMapping("/payment")
private String payment(int userId, int orderId, BigDecimal amount) throws Exception {
int result = paymenService.pamentMQ(userId, orderId, amount);
return "支付结果:" + result;
}
GET
http://localhost:8080/payment?userId=1&orderId=100&amount=200
数据库变化
account_a 扣减金额
t_order 状态更改=1
总结
- 请求接口
- com.example.tccdemo.controller.PaymenController#payment
- 生产者部分
- 支付接口:com.example.tccdemo.service.PaymenService#pamentMQ
- 扣减操作
- 订单号和主题消息“订单已支付”,放入消息队列 (主要部分是MQ出错的逻辑判断)
- 支付接口:com.example.tccdemo.service.PaymenService#pamentMQ
- 消费者部分
- com.example.tccdemo.config.RocketMQConfig#consumer -- @Qualifier("messageListener")
- 编写 messageListener 的 @Bean -- com.example.tccdemo.consumer.ChangeOrderStatus#consumeMessage
- 从List中获取订单号(OrderId)、消息(msg=订单已支付)
- 通过获取到OrderId查询订单,修改支付状态 (关键点:ConsumeConcurrentlyStatus的枚举状态)