[TOC]
# 分布式事务
## 本地消息
> 本节主要记录订单号,订单状态,重试次数。
## 准备工作
[使用的基础代码](https://blog.eddilee.cn/archives/%E4%BA%8B%E5%8A%A1%E8%A1%A5%E5%81%BF%E6%9C%BA%E5%88%B6%E6%98%AF%E4%BB%80%E4%B9%88)
### 原理
- 采用BASE原理, 保证事务最终一致
- 允许部分成功、部分失败
- 在一致性方面,允许一段时间内的不一致,但最终会一致
- 在实际的系统当中,要根据具体情况,判断是否采用
- 基于本地消息表的方案中,将<font color="red">本事务外</font>操作,记录在消息表中
- 例子:在电商平台中, 订单和支付两个产生两个事务,订单是在自己系统当中, 而支付是在第三方平台当中。在第三方支付就产生一条记录进行保存,就会有一个回调和定时去查询这条支付订单。
- 其他事务, 提供操作接口
- 定时任务轮询本地消息表, 将未执行的消息发送给操作接口
- 操作接口处理成功, 返回成功标识, 处理失败返回失败标识
- 定时任务接到标识, 更新消息的状态
- 定时任务按照一定的周期反复执行
- 对于屡次失败的消息, 可以设置最大失败次数
- 超过最大失败次数的消息, 不再进行接口调用
- 等待人工处理
### 本地消息表
- 优点:避免了分布式事务, 实现了最终一致性
- 缺点:要注意重试时的幂等性操作
### 数据库设计
支付表
```sql
CREATE TABLE `xa_246`.`payment_msg` (
`id` int(11) AUTO_INCREMENT NOT NULL,
`order_id` int(11) NOT NULL,
`status` int(1) NOT NULL DEFAULT 0 COMMENT '0=未发送,1=发送成功,2=超过最大发送次数',
`falure_cnt` int(1) NOT NULL DEFAULT 0 COMMENT '失败次数:最大5次',
`create_user` int(11) NOT NULL,
`create_time` datetime NOT NULL,
`update_user` int(11) NOT NULL,
`update_time` datetime NOT NULL,
PRIMARY KEY (`id`)
)
```
订单表
```sql
CREATE TABLE `xa_247`.`t_order` (
`id` int(11) AUTO_INCREMENT NOT NULL,
`order_status` int(1) NOT NULL DEFAULT 0 COMMENT '0=未支付,1=已支付',
`order_amount` decimal(10, 2) NOT NULL,
`receive_user` varchar(255) NOT NULL,
`receive_phone` varchar(11) NOT NULL,
`create_user` int(0) NOT NULL,
`create_time` datetime(0) NOT NULL,
`update_user` int(0) NOT NULL,
`update_time` datetime(0) NOT NULL,
PRIMARY KEY (`id`)
)
```
> 使用MyBatis逆向生成基础代码
### 支付接口
业务层
```java
@Service
public class PaymenService {
@Resource
private AccountAMapper accountAMapper;
@Resource
private PaymentMsgMapper paymentMsgMapper;
/**
* 支付接口
*
* @param userId
* @param orderId
* @param amount
* @return 0:代表成功、1: 用户不存在、2: 余额不足
*/
@Transactional(transactionManager = "tm246")
public int pament(int userId, int orderId, BigDecimal amount) {
/**
* 账户的扣减操作
*/
// 账户是否存在
AccountA accountA = accountAMapper.selectByPrimaryKey(userId);
if (accountA == null) {
return 1;
}
// 账户的余额小于你扣减的金额
if (accountA.getBalance().compareTo(amount) < 0) {
return 2;
}
// 账户余额减去传入的金额
accountA.setBalance(accountA.getBalance().subtract(amount));
accountAMapper.updateByPrimaryKey(accountA);
/**
* 支付信息表存放一条记录
*/
PaymentMsg paymentMsg = new PaymentMsg();
paymentMsg.setOrderId(orderId);
// 未发送
paymentMsg.setStatus(0);
// 失败次数
paymentMsg.setFalureCnt(0);
paymentMsg.setCreateTime(new Date());
paymentMsg.setCreateUser(userId);
paymentMsg.setUpdateTime(new Date());
paymentMsg.setUpdateUser(userId);
paymentMsgMapper.insertSelective(paymentMsg);
return 0;
}
}
```
控制层
```java
@RestController
public class PaymenController {
@Autowired
private PaymenService paymenService;
@RequestMapping("/payment")
private String payment(int userId, int orderId, BigDecimal amount) {
int result = paymenService.pament(userId, orderId, amount);
return "支付结果:" + result;
}
}
```
### 订单接口
业务层
```java
@Service
public class OrderService {
@Resource
private OrderMapper orderMapper;
/**
* 订单回调接口
*
* @param orderId
* @return 0:代表成功、1:订单不存在
*/
public int handleOrder(int orderId) {
Order order = orderMapper.selectByPrimaryKey(orderId);
if (order == null) {
return 1;
}
// 已支付
order.setOrderStatus(1);
order.setUpdateTime(new Date());
// 更新操作时间
order.setUpdateUser(0);
orderMapper.updateByPrimaryKey(order);
return 0;
}
}
```
控制层
```java
@RestController
public class OrderController {
@Autowired
private OrderService orderService;
@RequestMapping("/handleOrder")
public String handleOrder(int orderId) {
try {
int result = orderService.handleOrder(orderId);
if (result == 0) {
return "success";
}
return "fail";
} catch (Exception e) {
return "fail";
}
}
}
```
### 定时任务
```xml
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.10</version>
</dependency>
```
> @EnableScheduling
支付业务层
```java
@Service
public class PaymenService {
@Resource
private AccountAMapper accountAMapper;
@Resource
private PaymentMsgMapper paymentMsgMapper;
/**
* 支付接口
*
* @param userId
* @param orderId
* @param amount
* @return 0:代表成功、1: 用户不存在、2: 余额不足
*/
@Transactional(transactionManager = "tm246")
public int pament(int userId, int orderId, BigDecimal amount) {
/**
* 账户的扣减操作
*/
// 账户是否存在
AccountA accountA = accountAMapper.selectByPrimaryKey(userId);
if (accountA == null) {
return 1;
}
// 账户的余额小于你扣减的金额
if (accountA.getBalance().compareTo(amount) < 0) {
return 2;
}
// 账户余额减去传入的金额
accountA.setBalance(accountA.getBalance().subtract(amount));
accountAMapper.updateByPrimaryKey(accountA);
/**
* 支付信息表存放一条记录
*/
PaymentMsg paymentMsg = new PaymentMsg();
paymentMsg.setOrderId(orderId);
// 未发送
paymentMsg.setStatus(0);
// 失败次数
paymentMsg.setFalureCnt(0);
paymentMsg.setCreateTime(new Date());
paymentMsg.setCreateUser(userId);
paymentMsg.setUpdateTime(new Date());
paymentMsg.setUpdateUser(userId);
paymentMsgMapper.insertSelective(paymentMsg);
return 0;
}
}
```
支付控制层
```
@RestController
public class PaymenController {
@Autowired
private PaymenService paymenService;
@RequestMapping("/payment")
private String payment(int userId, int orderId, BigDecimal amount) {
int result = paymenService.pament(userId, orderId, amount);
return "支付结果:" + result;
}
}
```
订单业务层
```java
@Service
public class OrderService {
@Resource
private OrderMapper orderMapper;
/**
* 订单回调接口
*
* @param orderId
* @return 0:代表成功、1:订单不存在
*/
public int handleOrder(int orderId) {
Order order = orderMapper.selectByPrimaryKey(orderId);
// throw new RuntimeException("系统异常");
if (order == null) {
return 1;
}
// 已支付
order.setOrderStatus(1);
order.setUpdateTime(new Date());
// 更新操作时间
order.setUpdateUser(0);
orderMapper.updateByPrimaryKey(order);
return 0;
}
}
```
订单控制层
```java
@RestController
public class OrderController {
@Autowired
private OrderService orderService;
@RequestMapping("/handleOrder")
public String handleOrder(int orderId) {
try {
int result = orderService.handleOrder(orderId);
if (result == 0) {
return "success";
}
return "fail";
} catch (Exception e) {
return "fail";
}
}
}
```
定时任务业务层
```java
@Service
public class OrderScheduler {
@Resource
private PaymentMsgMapper paymentMsgMapper;
@Scheduled(cron = "0/10 * * * * ?")
public void orderNotify() throws IOException {
PaymentMsgExample paymentMsgExample = new PaymentMsgExample();
// 未发送
paymentMsgExample.createCriteria().andStatusEqualTo(0);
List<PaymentMsg> paymentMsgs = paymentMsgMapper.selectByExample(paymentMsgExample);
if (paymentMsgs == null || paymentMsgs.size() == 0) {
return;
}
for (PaymentMsg paymentMsg : paymentMsgs) {
Integer orderId = paymentMsg.getOrderId();
CloseableHttpClient httpClient = HttpClientBuilder.create().build();
// post 请求
HttpPost httpPost = new HttpPost("http://localhost:8080/handleOrder");
// 传入参数 orderId
NameValuePair orderPair = new BasicNameValuePair("orderId", orderId + "");
List<NameValuePair> list = new ArrayList<>();
list.add(orderPair);
// list 放入表单中
HttpEntity httpEntity = new UrlEncodedFormEntity(list);
httpPost.setEntity(httpEntity);
CloseableHttpResponse response = httpClient.execute(httpPost);
String str = EntityUtils.toString(response.getEntity());
if ("success".equals(str)) {
// 发送成功
paymentMsg.setStatus(1);
paymentMsg.setUpdateTime(new Date());
// 系统更新
paymentMsg.setUpdateUser(0);
paymentMsgMapper.updateByPrimaryKey(paymentMsg);
} else {
Integer falureCnt = paymentMsg.getFalureCnt();
falureCnt++;
paymentMsg.setFalureCnt(falureCnt);
// 如果更新次数大于5
if (falureCnt > 5) {
// 发送失败
paymentMsg.setStatus(2);
}
paymentMsg.setUpdateTime(new Date());
// 系统更新
paymentMsg.setUpdateUser(0);
paymentMsgMapper.updateByPrimaryKey(paymentMsg);
}
}
}
}
```
### 测试流程
1. 运行支付接口 http://localhost:8080/payment?userId=1&orderId=100&amount=200
2. 运行回调接口 http://localhost:8080/handleOrder?orderId=100
3. 上面两个接口没有问题情况下,还原数据库表数据,继续下面测试
4. 测试重试次数: 保证t_order表字段order_status=0
5. handleOrder()方法,抛出 throw new RuntimeException("系统异常") 注释下面代码
6. 重新启动项目,留意一下: payment_msg 消息表中的status、falure_cnt, 定时任务会触发
7. 当 payment_msg 消息表中的status=2 就需要人工处理
分布式事务 - 本地消息表