分布式事务 - 本地消息表 | Eddie'Blog
分布式事务 - 本地消息表

分布式事务 - 本地消息表

eddie 595 2020-11-26

分布式事务

本地消息

本节主要记录订单号,订单状态,重试次数。

准备工作

使用的基础代码

原理

  • 采用BASE原理, 保证事务最终一致
    • 允许部分成功、部分失败
  • 在一致性方面,允许一段时间内的不一致,但最终会一致
  • 在实际的系统当中,要根据具体情况,判断是否采用
  • 基于本地消息表的方案中,将本事务外操作,记录在消息表中
    • 例子:在电商平台中, 订单和支付两个产生两个事务,订单是在自己系统当中, 而支付是在第三方平台当中。在第三方支付就产生一条记录进行保存,就会有一个回调和定时去查询这条支付订单。
  • 其他事务, 提供操作接口
  • 定时任务轮询本地消息表, 将未执行的消息发送给操作接口
  • 操作接口处理成功, 返回成功标识, 处理失败返回失败标识
  • 定时任务接到标识, 更新消息的状态
  • 定时任务按照一定的周期反复执行
  • 对于屡次失败的消息, 可以设置最大失败次数
  • 超过最大失败次数的消息, 不再进行接口调用
  • 等待人工处理

本地消息表

  • 优点:避免了分布式事务, 实现了最终一致性
  • 缺点:要注意重试时的幂等性操作

数据库设计

支付表

CREATE TABLE `xa_246`.`payment_msg`  (
  `id` int(11) AUTO_INCREMENT NOT NULL,
  `order_id` int(11) NOT NULL,
  `status` int(1) NOT NULL DEFAULT 0  COMMENT '0=未发送,1=发送成功,2=超过最大发送次数',
  `falure_cnt` int(1) NOT NULL DEFAULT 0 COMMENT '失败次数:最大5次',
  `create_user` int(11) NOT NULL,
  `create_time` datetime NOT NULL,
  `update_user` int(11) NOT NULL,
  `update_time` datetime NOT NULL,
  PRIMARY KEY (`id`)
)

订单表

CREATE TABLE `xa_247`.`t_order`  (
  `id` int(11) AUTO_INCREMENT NOT NULL,
  `order_status` int(1) NOT NULL DEFAULT 0 COMMENT '0=未支付,1=已支付',
  `order_amount` decimal(10, 2) NOT NULL,
  `receive_user` varchar(255) NOT NULL,
  `receive_phone` varchar(11) NOT NULL,
  `create_user` int(0) NOT NULL,
  `create_time` datetime(0) NOT NULL,
  `update_user` int(0) NOT NULL,
  `update_time` datetime(0) NOT NULL,
  PRIMARY KEY (`id`)
)

使用MyBatis逆向生成基础代码

支付接口

业务层

@Service
public class PaymenService {

    @Resource
    private AccountAMapper accountAMapper;

    @Resource
    private PaymentMsgMapper paymentMsgMapper;

    /**
     * 支付接口
     *
     * @param userId
     * @param orderId
     * @param amount
     * @return 0:代表成功、1: 用户不存在、2: 余额不足
     */
    @Transactional(transactionManager = "tm246")
    public int pament(int userId, int orderId, BigDecimal amount) {
        /**
         * 账户的扣减操作
         */
        // 账户是否存在
        AccountA accountA = accountAMapper.selectByPrimaryKey(userId);
        if (accountA == null) {
            return 1;
        }
        // 账户的余额小于你扣减的金额
        if (accountA.getBalance().compareTo(amount) < 0) {
            return 2;
        }
        // 账户余额减去传入的金额
        accountA.setBalance(accountA.getBalance().subtract(amount));
        accountAMapper.updateByPrimaryKey(accountA);

        /**
         * 支付信息表存放一条记录
         */
        PaymentMsg paymentMsg = new PaymentMsg();
        paymentMsg.setOrderId(orderId);
        // 未发送
        paymentMsg.setStatus(0);
        // 失败次数
        paymentMsg.setFalureCnt(0);
        paymentMsg.setCreateTime(new Date());
        paymentMsg.setCreateUser(userId);
        paymentMsg.setUpdateTime(new Date());
        paymentMsg.setUpdateUser(userId);
        paymentMsgMapper.insertSelective(paymentMsg);

        return 0;
    }
}

控制层

@RestController
public class PaymenController {

    @Autowired
    private PaymenService paymenService;

    @RequestMapping("/payment")
    private String payment(int userId, int orderId, BigDecimal amount) {
        int result = paymenService.pament(userId, orderId, amount);
        return "支付结果:" + result;
    }

}

订单接口

业务层

@Service
public class OrderService {

    @Resource
    private OrderMapper orderMapper;

    /**
     * 订单回调接口
     *
     * @param orderId
     * @return 0:代表成功、1:订单不存在
     */
    public int handleOrder(int orderId) {
        Order order = orderMapper.selectByPrimaryKey(orderId);
        if (order == null) {
            return 1;
        }
        // 已支付
        order.setOrderStatus(1);
        order.setUpdateTime(new Date());
        // 更新操作时间
        order.setUpdateUser(0);
        orderMapper.updateByPrimaryKey(order);
        return 0;
    }
}

控制层

@RestController
public class OrderController {

    @Autowired
    private OrderService orderService;

    @RequestMapping("/handleOrder")
    public String handleOrder(int orderId) {
        try {
            int result = orderService.handleOrder(orderId);
            if (result == 0) {
                return "success";
            }
            return "fail";
        } catch (Exception e) {
            return "fail";
        }
    }
}

定时任务

<dependency>
	<groupId>org.apache.httpcomponents</groupId>
	<artifactId>httpclient</artifactId>
	<version>4.5.10</version>
</dependency>

@EnableScheduling

支付业务层

@Service
public class PaymenService {

    @Resource
    private AccountAMapper accountAMapper;

    @Resource
    private PaymentMsgMapper paymentMsgMapper;

    /**
     * 支付接口
     *
     * @param userId
     * @param orderId
     * @param amount
     * @return 0:代表成功、1: 用户不存在、2: 余额不足
     */
    @Transactional(transactionManager = "tm246")
    public int pament(int userId, int orderId, BigDecimal amount) {
        /**
         * 账户的扣减操作
         */
        // 账户是否存在
        AccountA accountA = accountAMapper.selectByPrimaryKey(userId);
        if (accountA == null) {
            return 1;
        }
        // 账户的余额小于你扣减的金额
        if (accountA.getBalance().compareTo(amount) < 0) {
            return 2;
        }
        // 账户余额减去传入的金额
        accountA.setBalance(accountA.getBalance().subtract(amount));
        accountAMapper.updateByPrimaryKey(accountA);

        /**
         * 支付信息表存放一条记录
         */
        PaymentMsg paymentMsg = new PaymentMsg();
        paymentMsg.setOrderId(orderId);
        // 未发送
        paymentMsg.setStatus(0);
        // 失败次数
        paymentMsg.setFalureCnt(0);
        paymentMsg.setCreateTime(new Date());
        paymentMsg.setCreateUser(userId);
        paymentMsg.setUpdateTime(new Date());
        paymentMsg.setUpdateUser(userId);
        paymentMsgMapper.insertSelective(paymentMsg);

        return 0;

    }

}

支付控制层

@RestController
public class PaymenController {

    @Autowired
    private PaymenService paymenService;

    @RequestMapping("/payment")
    private String payment(int userId, int orderId, BigDecimal amount) {
        int result = paymenService.pament(userId, orderId, amount);
        return "支付结果:" + result;
    }
}

订单业务层

@Service
public class OrderService {

    @Resource
    private OrderMapper orderMapper;

    /**
     * 订单回调接口
     *
     * @param orderId
     * @return 0:代表成功、1:订单不存在
     */
    public int handleOrder(int orderId) {
        Order order = orderMapper.selectByPrimaryKey(orderId);

//        throw new RuntimeException("系统异常");

        if (order == null) {
            return 1;
        }
        // 已支付
        order.setOrderStatus(1);
        order.setUpdateTime(new Date());
        // 更新操作时间
        order.setUpdateUser(0);
        orderMapper.updateByPrimaryKey(order);
        return 0;
    }

}

订单控制层

@RestController
public class OrderController {

    @Autowired
    private OrderService orderService;

    @RequestMapping("/handleOrder")
    public String handleOrder(int orderId) {
        try {
            int result = orderService.handleOrder(orderId);
            if (result == 0) {
                return "success";
            }
            return "fail";
        } catch (Exception e) {
            return "fail";
        }
    }
}

定时任务业务层

@Service
public class OrderScheduler {

    @Resource
    private PaymentMsgMapper paymentMsgMapper;

    @Scheduled(cron = "0/10 * * * * ?")
    public void orderNotify() throws IOException {
        PaymentMsgExample paymentMsgExample = new PaymentMsgExample();
        // 未发送
        paymentMsgExample.createCriteria().andStatusEqualTo(0);
        List<PaymentMsg> paymentMsgs = paymentMsgMapper.selectByExample(paymentMsgExample);
        if (paymentMsgs == null || paymentMsgs.size() == 0) {
            return;
        }
        for (PaymentMsg paymentMsg : paymentMsgs) {
            Integer orderId = paymentMsg.getOrderId();
            CloseableHttpClient httpClient = HttpClientBuilder.create().build();
            // post 请求
            HttpPost httpPost = new HttpPost("http://localhost:8080/handleOrder");
            // 传入参数 orderId
            NameValuePair orderPair = new BasicNameValuePair("orderId", orderId + "");
            List<NameValuePair> list = new ArrayList<>();
            list.add(orderPair);
            // list 放入表单中
            HttpEntity httpEntity = new UrlEncodedFormEntity(list);
            httpPost.setEntity(httpEntity);
            CloseableHttpResponse response = httpClient.execute(httpPost);
            String str = EntityUtils.toString(response.getEntity());
            if ("success".equals(str)) {
                // 发送成功
                paymentMsg.setStatus(1);
                paymentMsg.setUpdateTime(new Date());
                // 系统更新
                paymentMsg.setUpdateUser(0);
                paymentMsgMapper.updateByPrimaryKey(paymentMsg);
            } else {
                Integer falureCnt = paymentMsg.getFalureCnt();
                falureCnt++;
                paymentMsg.setFalureCnt(falureCnt);
                // 如果更新次数大于5
                if (falureCnt > 5) {
                    // 发送失败
                    paymentMsg.setStatus(2);
                }
                paymentMsg.setUpdateTime(new Date());
                // 系统更新
                paymentMsg.setUpdateUser(0);
                paymentMsgMapper.updateByPrimaryKey(paymentMsg);
            }
        }
    }
}

测试流程

  1. 运行支付接口 http://localhost:8080/payment?userId=1&orderId=100&amount=200
  2. 运行回调接口 http://localhost:8080/handleOrder?orderId=100
  3. 上面两个接口没有问题情况下,还原数据库表数据,继续下面测试
  4. 测试重试次数: 保证t_order表字段order_status=0
  5. handleOrder()方法,抛出 throw new RuntimeException("系统异常") 注释下面代码
  6. 重新启动项目,留意一下: payment_msg 消息表中的status、falure_cnt, 定时任务会触发
  7. 当 payment_msg 消息表中的status=2 就需要人工处理