分布式事务

1 分布式事务基础概念

1.1 事务概念

事务可以看做是一次大的活动,它由不同的小活动组成,这些活动要么全部成功,要么全部失败

你去小卖铺买东西,“一手交钱,一手交货”就是一个事务的例子,交钱和交货必须全部成功,事务才算成功,任一个活动失败,事务将撤销所有已成功的活动。

1.2 本地事务

回顾一下数据库事务的四大特性 ACID:

  • A(Atomic):原子性,构成事务的所有操作,要么都执行完成,要么全部不执行,不可能出现部分成功部分失败的情况。

  • C(Consistency):一致性,在事务执行前后,数据库的一致性约束没有被破坏。

  • I(Isolation):隔离性,数据库中的事务一般都是并发的,隔离性是指并发的两个事务的执行互不干扰,一个事务不能看到其他事务运行过程的中间状态。通过配置事务隔离级别可以避脏读、重复读等问题

  • D(Durability):持久性,事务完成之后,该事务对数据的更改会被持久化到数据库,且不会被回滚。

数据库事务在实现时会将一次事务涉及的所有操作全部纳入到一个不可分割的执行单元,该执行单元中的所有操作要么都成功,要么都失败,只要其中任一操作执行失败,都将导致整个事务的回滚

1.3 分布式事务

随着互联网的快速发展,软件系统由原来的单体应用转变为分布式应用。分布式系统会把一个应用系统拆分为可独立部署的多个服务,因此需要服务与服务之间远程协作才能完成事务操作,这种分布式系统环境下由不同的服务之间通过网络远程协作完成事务称之为分布式事务,例如用户注册送积分事务、创建订单减库存事务,银行转账事务等都是分布式事务。

2 分布式事务基础理论

2.1 CAP理论

CAP是 Consistency、Availability、Partition tolerance三个词语的缩写,分别表示一致性、可用性、分区容忍性。

  • C - Consistency:一致性是指写操作后的读操作可以读取到最新的数据状态,当数据分布在多个节点上,从任意结点读取到的数据都是最新的状态

如何实现一致性?

  1. 写入主数据库后要将数据同步到从数据库。
  2. 写入主数据库后,在向从数据库同步期间要将从数据库锁定,待同步完成后再释放锁,以免在新数据写入成功后,向从数据库查询到旧的数据。

分布式系统一致性的特点:

  1. 由于存在数据同步的过程,写操作的响应会有一定的延迟。

  2. 为了保证数据一致性会对资源暂时锁定,待数据同步完成释放锁定资源。

  3. 如果请求数据同步失败的结点则会返回错误信息,一定不会返回旧数据。

  4. A - Availability :可用性是指任何事务操作都可以得到响应结果,且不会出现响应超时或响应错误

如何实现可用性?

  1. 写入主数据库后要将数据同步到从数据库。
  2. 由于要保证从数据库的可用性,不可将从数据库中的资源进行锁定。
  3. 即时数据还没有同步过来,从数据库也要返回要查询的数据,哪怕是旧数据,如果连旧数据也没有则可以按照约定返回一个默认信息,但不能返回错误或响应超时。

分布式系统可用性的特点:

  1. 所有请求都有响应,且不会出现响应超时或响应错误。

  2. P - Partition tolerance :通常分布式系统的各各结点部署在不同的子网,这就是网络分区,不可避免的会出现由于网络问题而导致结点之间通信失败,此时仍可对外提供服务,这叫分区容忍性。

如何实现分区容忍性?

  1. 尽量使用异步取代同步操作,例如使用异步方式将数据从主数据库同步到从数据,这样结点之间能有效的实现松耦合。
  2. 添加从数据库结点,其中一个从结点挂掉其它从结点提供服务。

分布式分区容忍性的特点:

  1. 分区容忍性分是布式系统具备的基本能力。

在所有分布式事务场景中不会同时具备CAP三个特性,因为在具备了P的前提下C和A是不能共存的。

所以在生产中对分布式事务处理时要根据需求来确定满足CAP的哪两个方面。

  • AP:放弃一致性,追求分区容忍性和可用性。这是很多分布式系统设计时的选择。通常实现AP都会保证最终一致性,后面讲的BASE理论就是根据AP来扩展的,一些业务场景 比如:订单退款,今日退款成功,明日账户到账,只要用户可以接受在一定时间内到账即可。
  • CP:放弃可用性,追求一致性和分区容错性,我们的zookeeper其实就是追求的强一致,又比如跨行转账,一次转账请求要等待双方银行系统都完成整个事务才算完成。
  • CA:放弃分区容忍性,即不进行分区,不考虑由于网络不通或结点挂掉的问题,则可以实现一致性和可用性。那么系统将不是一个标准的分布式系统,我们最常用的关系型数据就满足了CA。

2.2 Base理论

CAP理论告诉我们一个分布式系统最多只能同时满足一致性(Consistency)、可用性(Availability)和分区容忍性(Partition tolerance)这三项中的两项,其中AP在实际应用中较多,AP即舍弃一致性,保证可用性和分区容忍性,但是在实际生产中很多场景都要实现一致性,比如前边我们举的例子主数据库向从数据库同步数据,即使不要一致性,但是最终也要将数据同步成功来保证数据一致,这种一致性和CAP中的一致性不同,CAP中的一致性要求在任何时间查询每个结点数据都必须一致,它强调的是强一致性,但是最终一致性是允许可以在一段时间内每个结点的数据不一致,但是经过一段时间每个结点的数据必须一致,它强调的是最终数据的一致性。

BASE 是 Basically Available(基本可用)、Soft state(软状态)和 Eventually consistent (最终一致性)三个短语的缩写。BASE理论是对CAP中AP的一个扩展,通过牺牲强一致性来获得可用性,当出现故障允许部分不可用但要保证核心功能可用,允许数据在一段时间内是不一致的,但最终达到一致状态。满足BASE理论的事务,我们称之为“柔性事务”。

  • 基本可用:分布式系统在出现故障时,允许损失部分可用功能,保证核心功能可用。如,电商网站交易付款出现问题了,商品依然可以正常浏览。
  • 软状态:由于不要求强一致性,所以BASE允许系统中存在中间状态(也叫软状态),这个状态不影响系统可用性,如订单的"支付中”、“数据同步中”等状态,待数据最终一致后状态改为“成功”状态。
  • 最终一致:最终一致是指经过一段时间后,所有节点数据都将会达到一致。如订单的"支付中"状态,最终会变为“支付成功”或者"支付失败”,使订单状态与实际交易结果达成一致,但需要一定时间的延迟、等待。

3 Seata Server的启动

为了后面的学习,我们这里先启动下SEATA的服务端,这里采用Docker安装,并之前已经安装好Nacos、MySQL等

3.1 创建数据库

我们创建一个Seata的数据库,并创建用户添加该数据库权限

image-20250820005949220

下面是建表脚本,来源Github

-- @link https://github.com/apache/incubator-seata/blob/2.x/script/server/db/mysql.sql
CREATE DATABASE IF NOT EXISTS `seata`;

USE `seata`;

-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
    `xid`                       VARCHAR(128) NOT NULL,
    `transaction_id`            BIGINT,
    `status`                    TINYINT      NOT NULL,
    `application_id`            VARCHAR(32),
    `transaction_service_group` VARCHAR(32),
    `transaction_name`          VARCHAR(128),
    `timeout`                   INT,
    `begin_time`                BIGINT,
    `application_data`          VARCHAR(2000),
    `gmt_create`                DATETIME,
    `gmt_modified`              DATETIME,
    PRIMARY KEY (`xid`),
    KEY `idx_status_gmt_modified` (`status` , `gmt_modified`),
    KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
    `branch_id`         BIGINT       NOT NULL,
    `xid`               VARCHAR(128) NOT NULL,
    `transaction_id`    BIGINT,
    `resource_group_id` VARCHAR(32),
    `resource_id`       VARCHAR(256),
    `branch_type`       VARCHAR(8),
    `status`            TINYINT,
    `client_id`         VARCHAR(64),
    `application_data`  VARCHAR(2000),
    `gmt_create`        DATETIME(6),
    `gmt_modified`      DATETIME(6),
    PRIMARY KEY (`branch_id`),
    KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
    `row_key`        VARCHAR(128) NOT NULL,
    `xid`            VARCHAR(128),
    `transaction_id` BIGINT,
    `branch_id`      BIGINT       NOT NULL,
    `resource_id`    VARCHAR(256),
    `table_name`     VARCHAR(32),
    `pk`             VARCHAR(36),
    `status`         TINYINT      NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking',
    `gmt_create`     DATETIME,
    `gmt_modified`   DATETIME,
    PRIMARY KEY (`row_key`),
    KEY `idx_status` (`status`),
    KEY `idx_branch_id` (`branch_id`),
    KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

CREATE TABLE IF NOT EXISTS `distributed_lock`
(
    `lock_key`       CHAR(20) NOT NULL,
    `lock_value`     VARCHAR(20) NOT NULL,
    `expire`         BIGINT,
    primary key (`lock_key`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);


CREATE TABLE IF NOT EXISTS `vgroup_table`
(
    `vGroup`    VARCHAR(255),
    `namespace` VARCHAR(255),
    `cluster`   VARCHAR(255),
    UNIQUE KEY `idx_vgroup_namespace_cluster` (`vGroup`,`namespace`,`cluster`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

3.2 配置Seata并启动

我们这里用最新的seata-server:2.5.0

docker pull apache/seata-server:2.5.0

把配置文件拷贝出来

docker run -itd --restart=always -p 8091:8091 -p 7091:7091 --name seata-server apache/seata-server:2.5.0

docker cp seata-server:/seata-server/resource/application.example.yml /opt/seata/config/application.yml

这里的配置主要参考官方文档,主要将配置移动到Nacos,以及将数据存储修改为MySQL

seata:
  config:
    type: nacos
    nacos:
      server-addr: 192.168.18.99:8848
      namespace: 7d61b1fa-e877-4276-a049-ab1bc2f89258
      group: SEATA_GROUP
      data-id: seataServer.properties
  registry:
    type: nacos
    nacos:
      application: seata-server
      server-addr: 192.168.18.99:8848
      group: SEATA_GROUP
      namespace: 7d61b1fa-e877-4276-a049-ab1bc2f89258

在Nacos上新建seataServer.properties配置文件,如下:

store.mode=db
store.db.dbType=mysql
store.db.datasource=druid
store.db.driverClassName=com.mysql.cj.jdbc.Driver
store.db.url=jdbc:mysql://192.168.18.99:3306/seata?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useSSL=false&allowPublicKeyRetrieval=true
store.db.user=seata
store.db.password=seata123
store.db.minConn=10
store.db.maxConn=100
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=1000
store.db.lockTable=lock_table
store.db.maxWait=5000
store.db.druid.timeBetweenEvictionRunsMillis=120000
store.db.druid.min-evictable-idle-time-millis=300000
store.db.druid.testWhileIdle=true
store.db.druid.testOnBorrow=false
store.db.druid.keepAlive=false
store.redis.mode=single
store.redis.single.host=192.168.18.99
store.redis.single.port=6379
store.redis.type=lua
store.redis.maxConn=100
store.redis.minConn=10
store.redis.maxTotal=100
store.redis.database=0
store.redis.password=123456
store.redis.queryLimit=1000

# console
console.user.username=seata
console.user.password=seata
seata.security.secretKey=

最后挂载出来配置文件启动即可

docker rm -f seata-server

docker run -itd --restart=always -p 8091:8091 -p 7091:7091 -v /opt/seata/config/application.yml:/seata-server/resources/application.yml --name seata-server apache/seata-server:2.5.0

4 分布式事务解决方案之2PC

前面已经学习了分布式事务的基础理论,以理论为基础,针对不同的分布式场景业界常见的解决方案有2PC、TCC、可靠消息最终一致性、最大努力通知这几种。

4.1 2PC定义

2PC即两阶段提交协议,是将整个事务流程分为两个阶段,准备阶段(Prepare phase)、提交阶段(Commit phase),2是指两个阶段,P是指准备阶段,C是指提交阶段。

在计算机中部分关系数据库如Oracle、MySQL支持两阶段提交协议

  1. 准备阶段(Prepare phase):事务管理器给每个参与者发送Prepare消息,每个数据库参与者在本地执行事务,并写本地的Undo/Redo日志,此时事务没有提交。(Undo日志是记录修改前的数据,用于数据库回滚,Redo日志是记录修改后的数据,用于提交事务后写入数据文件)

  2. 提交阶段(commit phase):如果事务管理器收到了参与者的执行失败或者超时消息时,直接给每个参与者发送回滚(Rollback)消息;否则,发送提交(Commit)消息;参与者根据事务管理器的指令执行提交或者回滚操作,并释放事务处理过程中使用的锁资源。注意:必须在最后阶段释放锁资源。

  3. 成功情况:

image-20250819153241788

  • 失败情况:

image-20250819153302730

4.2 解决方案

4.2.1 XA方案

2PC的传统方案是在数据库层面实现的,如Oracle、MySQL都支持2PC协议,为了统一标准减少行业内不必要的对接成本,需要制定标准化的处理模型及接口标准,国际开放标准组织Open Group定义了分布式事务处理模型DTP(Distributed Transaction Processing Reference Model)。

DTP模型定义如下角色:

  • AP(Application Program):即应用程序,可以理解为使用DTP分布式事务的程序。
  • RM(Resource Manager):即资源管理器,可以理解为事务的参与者,一般情况下是指一个数据库实例,通过资源管理器对该数据库进行控制,资源管理器控制着分支事务。
  • TM(Transaction Manager):事务管理器,负责协调和管理事务,事务管理器控制着全局事务,管理事务生命周期,并协调各个RM。全局事务是指分布式事务处理环境中,需要操作多个数据库共同完成一个工作,这个工作即是一个全局事务。

TM向AP提供 应用程序编程接口,AP通过TM提交及回滚事务;TM交易中间件通过XA接口来通知RM数据库事务的开始、结束以及提交、回滚等。DTP模型定义TM和RM之间通讯的接口规范叫XA,简单理解为数据库提供的2PC接口协议,基于数据库的XA协议来实现2PC又称为XA方案

XA方案的执行流程如下:

  1. 准备阶段RM执行实际的业务操作,但不提交事务,资源锁定;
  2. 提交阶段TM会接受RM在准备阶段的执行回复,只要有任一个RM执行失败,TM会通知所有RM执行回滚操作,否则,TM将会通知所有RM提交该事务。提交阶段结束资源锁释放

XA方案的问题:

  1. 需要本地数据库支持XA协议。
  2. 资源锁需要等到两个阶段结束才释放,性能较差

4.2.2 Seata方案

Seata是由阿里中间件团队发起的开源项目 Fescar,后更名为Seata,它是一个是开源的分布式事务框架。

传统2PC的问题在Seata中得到了解决,它通过对本地关系数据库的分支事务的协调来驱动完成全局事务,是工作在应用层的中间件。主要优点是性能较好,且不长时间占用连接资源,它以高效并且对业务0侵入的方式解决微服务场景下面临的分布式事务问题,它目前提供AT模式(即2PC)及TCC模式的分布式事务解决方案。

Seata的设计目标其一是对业务无侵入,因此从业务无侵入的2PC方案着手,在传统2PC的基础上演进,并解决2PC方案面临的问题。Seata把一个分布式事务理解成一个包含了若干分支事务全局事务。全局事务的职责是协调其下管辖的分支事务达成一致,要么一起成功提交,要么一起失败回滚。此外,通常分支事务本身就是一个关系数据库的本地事务。与传统2PC的模型类似,Seata定义了3个组件来协议分布式事务的处理过程:

image-20250819154128266

  • Transaction Coordinator (TC): 事务协调器,它是独立的中间件,需要独立部署运行,它维护全局事务的运行状态,接收TM指令发起全局事务的提交与回滚,负责与RM通信协调各各分支事务的提交或回滚。
  • Transaction Manager ™: 事务管理器,TM需要嵌入应用程序中工作,它负责开启一个全局事务,并最终向TC发起全局提交或全局回滚的指令。
  • Resource Manager (RM): 控制分支事务,负责分支注册、状态汇报,并接收事务协调器TC的指令,驱动分支(本地)事务的提交和回滚。

我们拿新用户注册送积分举例Seata的分布式事务过程

image-20250819154242187

  1. 用户服务的 TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的XID。
  2. 用户服务的 RM 向 TC 注册 分支事务,该分支事务在用户服务执行新增用户逻辑,并将其纳入 XID 对应全局事务的管辖。
  3. 用户服务执行分支事务,向用户表插入一条记录。
  4. 逻辑执行到远程调用积分服务时(XID 在微服务调用链路的上下文中传播)。积分服务的RM 向 TC 注册分支事务,该分支事务执行增加积分的逻辑,并将其纳入 XID 对应全局事务的管辖。
  5. 积分服务执行分支事务,向积分记录表插入一条记录,执行完毕后,返回用户服务。
  6. 用户服务分支事务执行完毕。
  7. TM 向 TC 发起针对 XID 的全局提交或回滚决议。
  8. TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求。

Seata实现2PC与传统2PC的差别

  • 架构层次方面,传统2PC方案的 RM 实际上是在数据库层,RM 本质上就是数据库自身,通过 XA 协议实现,而Seata的 RM 是以jar包的形式作为中间件层部署在应用程序这一侧的。
  • 两阶段提交方面,传统2PC无论第二阶段的决议是commit还是rollback,事务性资源的锁都要保持到Phase2完成才释放。而Seata的做法是在Phase1 就将本地事务提交,这样就可以省去Phase2持锁的时间,整体提高效率。

4.2.3 Seata 2PC测试

测试代码可见链接

我们来个示例来测试一下Seata的2PC,首先确保SeataServer已经启动了

创建数据库demo_account和demo_storage,表结构如下

CREATE DATABASE `demo_account` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';

use `demo_account`;

CREATE TABLE `tab_account` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '户主姓名',
  `account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '银行卡号',
  `account_balance` decimal(18, 2) NULL DEFAULT NULL COMMENT '帐户余额',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT =Dynamic;

INSERT INTO `tab_account` VALUES (1, '张三的账户', '1', 10000);

CREATE DATABASE `demo_storage` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';

use `demo_storage`;

CREATE TABLE `tab_storage` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `product_id` bigint(20) NOT NULL COMMENT '商品ID',
  `stock` bigint(20) NOT NULL DEFAULT 0 COMMENT '商品库存',
  `sku_id` bigint(20) NOT NULL COMMENT 'SKU id',
  `price` decimal(18, 2) NOT NULL DEFAULT 0 COMMENT '商品价格',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT =Dynamic;

INSERT INTO `tab_storage` VALUES (1, 10001, 100, 100010001, 100);

在demo_account和demo_storage分别创建undo_log表,参考Github

CREATE TABLE IF NOT EXISTS `undo_log`
(
    `branch_id`     BIGINT       NOT NULL COMMENT 'branch transaction id',
    `xid`           VARCHAR(128) NOT NULL COMMENT 'global transaction id',
    `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
    `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
    `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
    `log_created`   DATETIME(6)  NOT NULL COMMENT 'create datetime',
    `log_modified`  DATETIME(6)  NOT NULL COMMENT 'modify datetime',
    UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';
ALTER TABLE `undo_log` ADD INDEX `ix_log_created` (`log_created`);

创建工程seata-demo-2pc-accountseata-demo-2pc-storage

下面配置文件和Pom列举一个,另外一个相似,pom文件如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>io.adrainty</groupId>
        <artifactId>seata-demo-2pc</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>seata-demo-2pc-account</artifactId>
    <packaging>pom</packaging>
    <modules>
        <module>seata-demo-2pc-account-feign</module>
    </modules>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>

        <dependency>
            <groupId>com.mysql</groupId>
            <artifactId>mysql-connector-j</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-spring-boot3-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-loadbalancer</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-3-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>

</project>

配置文件如下,注意group和namespace需要和seata-server启动配置的是一致的,不然会找不到服务

server:
  port: 8082

spring:
  application:
    name: account-service
  cloud:
    nacos:
      discovery:
        server-addr: localhost:8848
        namespace: 7d61b1fa-e877-4276-a049-ab1bc2f89258
        group: SEATA_GROUP
  datasource:
    type: com.alibaba.druid.pool.DruidDataSource
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/demo_account?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf-8&allowPublicKeyRetrieval=true
    username: root
    password: 123456

seata:
  registry:
    type: nacos
    nacos:
      server-addr: ${spring.cloud.nacos.discovery.server-addr}
      namespace: ${spring.cloud.nacos.discovery.namespace}
      group: ${spring.cloud.nacos.discovery.group}
      application: seata-server
  # 事务组, 由它获得TC服务的集群名称
  tx-service-group: default_tx_group
  service:
    vgroup-mapping:
      # 事务组与TC服务集群的映射关系
      default_tx_group: ${spring.cloud.nacos.discovery.group}
  data-source-proxy-mode: AT

写一个简单的商品调用户扣减余额的功能,注意这里会涉及到ABA等问题,实际使用得用分布式锁 或者用SQL来保证一致性

StorageServiceImpl如下

@Service
public class StorageServiceImpl extends ServiceImpl<StorageMapper, StorageBO> implements IStorageService {

    @Resource
    private IAccountFeign accountFeign;

    @Override
    @GlobalTransactional(name = "do-shop", rollbackFor = Exception.class)
    public Boolean doShop(String accountNo, Long productId, Long skuId, Integer count) {
        Wrapper<StorageBO> wrapper = this.lambdaQuery()
                .eq(StorageBO::getProductId, productId)
                .eq(StorageBO::getSkuId, skuId);
        StorageBO storageBO = this.getOne(wrapper);
        if (storageBO == null) {
            throw new RuntimeException("商品不存在");
        }

        if (storageBO.getStock() < count) {
            throw new RuntimeException("库存不足");
        }

        boolean updated = this.lambdaUpdate()
                .eq(StorageBO::getProductId, productId)
                .eq(StorageBO::getSkuId, skuId)
                .eq(StorageBO::getStock, storageBO.getStock())
                .set(StorageBO::getStock, storageBO.getStock() - count)
                .update();
        if (!updated) {
            throw new RuntimeException("库存不足");
        }

        BigDecimal needDecrementBalance = storageBO.getPrice().multiply(new BigDecimal(count)).multiply(BigDecimal.valueOf(-1));
        AccountEntity accountEntity = AccountEntity.builder()
                .accountNo(accountNo)
                .accountBalance(needDecrementBalance)
                .build();
        Boolean result = accountFeign.addAccountBalance(accountEntity);
        if (!Boolean.TRUE.equals(result)) {
            throw new RuntimeException("账户余额不足");
        }
        return Boolean.TRUE;
    }

}

AccountServiceImpl如下

@Service
public class AccountServiceImpl extends ServiceImpl<AccountMapper, AccountBO> implements IAccountService {

    @Override
    public Boolean addAccountBalance(AccountEntity param) {
        AccountBO accountBO = this.getOne(Wrappers.lambdaQuery(AccountBO.class)
                .eq(AccountBO::getAccountNo, param.getAccountNo()));
        if (accountBO == null) {
            throw new RuntimeException("账户不存在");
        }
        BigDecimal currentAccountBalance = accountBO.getAccountBalance();
        BigDecimal remainBalance = currentAccountBalance.add(param.getAccountBalance());
        if (remainBalance.compareTo(BigDecimal.ZERO) < 0) {
            throw new RuntimeException("账户余额不足");
        }

        Wrapper<AccountBO> updateWrapper = Wrappers.lambdaUpdate(AccountBO.class)
                .eq(AccountBO::getAccountNo, param.getAccountNo())
                .eq(AccountBO::getAccountBalance, currentAccountBalance)
                .set(AccountBO::getAccountBalance, remainBalance);

        return this.update(updateWrapper);
    }

}

在AccountServiceImpl打上断点,模拟网络不通的情况,当Feign超时后,放开断点,可以发现Account触发了Seata的全局事务回滚

image-20250820233242092

5 分布式事务解决方案之TCC

5.1 什么是TCC事务

TCC是Try、Confirm、Cancel三个词语的缩写,TCC要求每个分支事务实现三个操作:预处理Try、确认Confirm、撤销Cancel。Try操作做业务检查及资源预留,Confirm做业务确认操作,Cancel实现一个与Try相反的操作即回滚操作。TM首先发起所有的分支事务的try操作,任何一个分支事务的try操作执行失败,TM将会发起所有分支事务的Cancel操作,若try操作全部成功,TM将会发起所有分支事务的Confirm操作,其中Confirm/Cancel操作若执行失败,TM会进行重试。

分支事务成功的情况:

image-20250820233758882

分支事务失败的情况:

image-20250820233804639

TCC分为三个阶段:

  1. Try阶段是做业务检查(一致性)及资源预留(隔离),此阶段仅是一个初步操作,它和后续的Confirm一起才能真正构成一个完整的业务逻辑。
  2. Confirm 阶段是做确认提交,Try阶段所有分支事务执行成功后开始执行 Confirm。通常情况下,采用TCC则认为Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功。若Confirm阶段真的出错了,需引入重试机制或人工处理。
  3. Cancel 阶段是在业务执行错误需要回滚的状态下执行分支事务的业务取消,预留资源释放。通常情况下,采用TCC则认为Cancel阶段也是一定成功的。若Cancel阶段真的出错了,需引入重试机制或人工处理。

TM事务管理器可以实现为独立的服务,也可以让全局事务发起方充当TM的角色,TM独立出来是为了成为公用组件,是为了考虑系统结构和软件复用。TM在发起全局事务时生成全局事务记录,全局事务ID贯穿整个分布式事务调用链条,用来记录事务上下文,追踪和记录状态,由于Confirm 和cancel失败需进行重试,因此需要实现为幂等

幂等性是指同一个操作无论请求多少次,其结果都相同。

5.2 解决方案

上一节所讲的Seata也支持TCC,修改一下代码

增加IStorageTCC

@LocalTCC
public interface IStorageTCC {

    @TwoPhaseBusinessAction(name = "doShop", commitMethod = "commit", rollbackMethod = "rollback")
    boolean prepare(BusinessActionContext actionContext,
                    @BusinessActionContextParameter(paramName = "accountNo") String accountNo,
                    @BusinessActionContextParameter(paramName = "productId") Long productId,
                    @BusinessActionContextParameter(paramName = "skuId") Long skuId,
                    @BusinessActionContextParameter(paramName = "count") Integer count);

    boolean commit(BusinessActionContext actionContext);

    boolean rollback(BusinessActionContext actionContext);

}
  1. @LocalTCC:该注解开启TCC事务
  2. @TwoPhaseBusinessAction:该注解标注在try方法上,其中的三个属性如下:
  3. name:TCC事务的名称,必须是唯一的
  4. commitMethod:confirm方法的名称,默认是commit
  5. rollbackMethod:cancel方法的名称,默认是rollback
  6. confirm和cancel的返回值尤为重要,返回false则会不断的重试。

实现方法

@Slf4j
@Component
public class StorageTCCImpl implements IStorageTCC {

    @Resource
    private IAccountFeign accountFeign;

    @Resource
    private StorageMapper storageMapper;

    @Override
    public boolean prepare(BusinessActionContext actionContext, String accountNo, Long productId, Long skuId, Integer count) {
        String xid = actionContext.getXid();
        log.info("storage prepare, xid: {}, accountNo: {}, productId: {}, skuId: {}, count: {}", xid, accountNo, productId, skuId, count);

        Wrapper<StorageBO> wrapper = Wrappers.lambdaQuery(StorageBO.class)
                .eq(StorageBO::getProductId, productId)
                .eq(StorageBO::getSkuId, skuId);
        StorageBO storageBO = storageMapper.selectOne(wrapper);
        if (storageBO == null) {
            throw new RuntimeException("商品不存在");
        }

        if (storageBO.getStock() < count) {
            throw new RuntimeException("库存不足");
        }

        // 冻结库存
        wrapper = Wrappers.lambdaUpdate(StorageBO.class)
                .eq(StorageBO::getProductId, productId)
                .eq(StorageBO::getSkuId, skuId)
                .eq(StorageBO::getStock, storageBO.getStock())
                .setSql("stock = stock - {0}", count)
                .setSql("frozen_stock = frozen_stock + {0}", count);
        boolean updated = storageMapper.update(wrapper) > 0;
        if (!updated) {
            throw new RuntimeException("库存不足");
        }

        // 冻结账户余额
        BigDecimal needDecrementBalance = storageBO.getPrice().multiply(new BigDecimal(count));
        AccountEntity accountEntity = AccountEntity.builder()
                .accountNo(accountNo)
                .accountBalance(needDecrementBalance)
                .build();
        accountFeign.frozenBalance(accountEntity);
        BusinessActionContextUtil.addContext("needDecrementBalance", needDecrementBalance);

        return Boolean.TRUE;
    }

    @Override
    public boolean commit(BusinessActionContext actionContext) {
        String xid = actionContext.getXid();

        // 获取参数
        String accountNo = Objects.requireNonNull(actionContext.getActionContext("accountNo")).toString();
        Long productId = Long.parseLong(Objects.requireNonNull(actionContext.getActionContext("productId")).toString());
        Long skuId = Long.parseLong(Objects.requireNonNull(actionContext.getActionContext("skuId")).toString());
        Integer count = Integer.parseInt(Objects.requireNonNull(actionContext.getActionContext("count")).toString());
        BigDecimal needDecrementBalance = (BigDecimal) actionContext.getActionContext("needDecrementBalance");
        log.info("storage commit, xid: {}, accountNo: {}, productId: {}, skuId: {}, count: {}", xid, accountNo, productId, skuId, count);

        // 释放库存
        Wrapper<StorageBO> wrapper = Wrappers.lambdaUpdate(StorageBO.class)
                .eq(StorageBO::getProductId, productId)
                .eq(StorageBO::getSkuId, skuId)
                .setSql("frozen_stock = frozen_stock - {0}", count);
        boolean updated = storageMapper.update(wrapper) > 0;
        if (!updated) return false;

        // 释放账户余额
        AccountEntity accountEntity = AccountEntity.builder()
                .accountNo(accountNo)
                .accountBalance(needDecrementBalance)
                .build();
        return accountFeign.addAccountBalance(accountEntity);
    }

    @Override
    public boolean rollback(BusinessActionContext actionContext) {
        // 获取参数
        try {
            String accountNo = Objects.requireNonNull(actionContext.getActionContext("accountNo")).toString();
            Long productId = Long.parseLong(Objects.requireNonNull(actionContext.getActionContext("productId")).toString());
            Long skuId = Long.parseLong(Objects.requireNonNull(actionContext.getActionContext("skuId")).toString());
            Integer count = Integer.parseInt(Objects.requireNonNull(actionContext.getActionContext("count")).toString());
            BigDecimal needDecrementBalance = (BigDecimal) actionContext.getActionContext("needDecrementBalance");
            log.info("storage rollback, accountNo: {}, productId: {}, skuId: {}, count: {}", accountNo, productId, skuId, count);

            // 恢复库存
            Wrapper<StorageBO> wrapper = Wrappers.lambdaUpdate(StorageBO.class)
                    .eq(StorageBO::getProductId, productId)
                    .eq(StorageBO::getSkuId, skuId)
                    .setSql("frozen_stock = frozen_stock - {0}", count)
                    .setSql("stock = stock + {0}", count);
            boolean updated = storageMapper.update(wrapper) > 0;
            if (!updated) return false;

            // 恢复账户余额
            // 释放账户余额
            AccountEntity accountEntity = AccountEntity.builder()
                    .accountNo(accountNo)
                    .accountBalance(needDecrementBalance)
                    .build();
            return accountFeign.addAccountBalance(accountEntity);
        } catch (Exception e) {
            log.error("storage rollback error", e);
            return false;
        }
    }

}

Sevice直接调用TCC的prepare方法即可

@Slf4j
@Service
public class StorageServiceImpl extends ServiceImpl<StorageMapper, StorageBO> implements IStorageService {

    @Resource
    private IStorageTCC storageTCC;

    @Override
    @GlobalTransactional
    public boolean doShop(String accountNo, Long productId, Long skuId, Integer count) {
        return storageTCC.prepare(null, accountNo, productId, skuId, count);
    }

}

5.3 改进

上述写法其实会造成一些问题,这就得提到TCC的三种异常处理:空回滚、幂等以及悬挂

  • 空回滚:在没有调用 TCC 资源 Try 方法的情况下,调用了二阶段的 Cancel 方法,Cancel 方法需要识别出这是一个空回

    滚,然后直接返回成功。

出现原因是当一个分支事务所在服务宕机或网络异常,分支事务调用记录为失败,这个时候其实是没有执行Try阶段,当故障恢复后,分布式事务进行回滚则会调用二阶段的Cancel方法,从而形成空回滚。解决思路是关键就是要识别出这个空回滚。思路很简单就是需要知道一阶段是否执行,如果执行了,那就是正常回滚;如果没执行,那就是空回滚。前面已经说过TM在发起全局事务时生成全局事务记录,全局事务ID贯穿整个分布式事务调用链条。再额外增加一张分支事务记录表,其中有全局事务 ID 和分支事务 ID,第一阶段 Try 方法里会插入一条记录,表示一阶段执行了。Cancel 接口里读取该记录,如果该记录存在,则正常回滚;如果该记录不存在,则是空回滚。

  • 幂等:通过前面介绍已经了解到,为了保证TCC二阶段提交重试机制不会引发数据不一致,要求 TCC 的二阶段 Try、Confirm 和 Cancel 接口保证幂等,这样不会重复使用或者释放资源。如果幂等控制没有做好,很有可能导致数据不一致等严重问题。

解决思路在上述“分支事务记录”中增加执行状态,每次执行前都查询该状态。

  • 悬挂:悬挂就是对于一个分布式事务,其二阶段 Cancel 接口比 Try 接口先执行。

出现原因是在 RPC 调用分支事务try时,先注册分支事务,再执行RPC调用,如果此时 RPC 调用的网络发生拥堵,通常 RPC 调用是有超时时间的,RPC 超时以后,TM就会通知RM回滚该分布式事务,可能回滚完成后,RPC 请求才到达参与者真正执行,而一个 Try 方法预留的业务资源,只有该分布式事务才能使用,该分布式事务第一阶段预留的业务资源就再也没有人能够处理了,对于这种情况,我们就称为悬挂,即业务资源预留后没法继续处理。解决思路是如果二阶段执行完成,那一阶段就不能再继续执行。在执行一阶段事务时判断在该全局事务下,“分支事务记录”表中是否已经有二阶段事务记录,如果有则不执行Try。

5.4 TCC和2PC比较

如果拿TCC事务的处理流程与2PC两阶段提交做比较,2PC通常都是在跨库的DB层面,而TCC则在应用层面的处理,需要通过业务逻辑来实现。这种分布式事务的实现方式的优势在于,可以让应用自己定义数据操作的粒度,使得降低锁冲突、提高吞吐量成为可能

而不足之处则在于对应用的侵入性非常强,业务逻辑的每个分支都需要实现try、confirm、cancel三个操作。此外,其实现难度也比较大,需要按照网络状态、系统故障等不同的失败原因实现不同的回滚策略。

6 分布式事务解决方案之可靠消息最终一致性

6.1 可靠消息最终一致性定义

可靠消息最终一致性方案是指当事务发起方执行完成本地事务后并发出一条消息,事务参与方(消息消费者)一定能够接收消息并处理事务成功,此方案强调的是只要消息发给事务参与方最终事务要达到一致。此方案需要利用消息中间件完成

事务发起方(消息生产方)将消息发给消息中间件,事务参与方从消息中间件接收消息,事务发起方和消息中间件之间,事务参与方(消息消费方)和消息中间件之间都是通过网络通信,由于网络通信的不确定性会导致分布式事务问题。

image-20250821154733477

因此可靠消息最终一致性方案要解决以下几个问题:

  1. 本地事务与消息发送的原子性问题:本地事务与消息发送的原子性问题即:事务发起方在本地事务执行成功后消息必须发出去,否则就丢弃消息。即实现本地事务和消息发送的原子性,要么都成功,要么都失败。本地事务与消息发送的原子性问题是实现可靠消息最终一致性方案的关键问题。
  2. 事务参与方接受消息的可靠性:事务参与方必须能够从消息队列接收到消息,如果接收消息失败可以重复接收消息。
  3. 消息重复消费的问题:由于网络2的存在,若某一个消费节点超时但是消费成功,此时消息中间件会重复投递此消息,就导致了消息的重复消费。要解决消息重复消费的问题就要实现事务参与方的方法幂等性。

6.2 解决方案

6.2.1 本地消息表方案

本地消息表这个方案最初是eBay提出的,此方案的核心是通过本地事务保证数据业务操作和消息的一致性,然后通过定时任务将消息发送至消息中间件,待确认消息发送给消费方成功再将消息删除。

以注册送积分为例来说明

image-20250821155211535

  1. 用户注册:用户服务在本地事务新增用户和增加 ”积分消息日志“。这种情况下,本地数据库操作与存储积分消息日志处于同一个事务中,本地数据库操作与记录消息日志操作具备原子性。
  2. 定时任务扫描日志:经过第一步消息已经写到消息日志表中,可以启动独立的线程,定时对消息日志表中的消息进行扫描并发送至消息中间件,在消息中间件反馈发送成功后删除该消息日志,否则等待定时任务下一周期重试。
  3. 消费消息:积分服务接收到”增加积分“消息,开始增加积分,积分增加成功后向消息中间件回应ack,否则消息中间件将重复投递此消息。由于消息会重复投递,积分服务的”增加积分“功能需要实现幂等性。

如何保证消费者一定能消费到消息呢?

可以使用MQ的ack(即消息确认)机制,消费者监听MQ,如果消费者接收到消息并且业务处理完成后向MQ发送ack(即消息确认),此时说明消费者正常消费消息完成,MQ将不再向消费者推送消息,否则消费者会不断重试向消费者来发送消息。

6.2.2 RocketMQ事务消息方案

RocketMQ是一个来自阿里巴巴的分布式消息中间件。RocketMQ 事务消息设计则主要是为了解决 Producer 端的消息发送与本地事务执行的原子性问题,RocketMQ 的设计中 broker 与 producer 端的双向通信能力,使得 broker 天生可以作为一个事务协调者存在;而 RocketMQ本身提供的存储机制为事务消息提供了持久化能力;RocketMQ 的高可用机制以及可靠消息设计则为事务消息在系统发生异常时依然能够保证达成事务的最终一致性。

在RocketMQ 4.3后实现了完整的事务消息,实际上其实是对本地消息表的一个封装,将本地消息表移动到了MQ内部,解决 Producer 端的消息发送与本地事务执行的原子性问题

image-20250821155451760

  1. Producer发送事务消息:Producer (MQ发送方)发送事务消息至MQ Server,MQ Server将消息状态标记为Prepared(预备状态),注意此时这条消息消费者(MQ订阅方)是无法消费到的。
  2. MQ Server回应消息发送成功:MQ Server接收到Producer 发送给的消息则回应发送成功表示MQ已接收到消息。
  3. Producer 执行本地事务:Producer 端执行业务代码逻辑,通过本地数据库事务控制。
  4. 消息投递:若Producer 本地事务执行成功则自动向MQServer发送commit消息,MQ Server接收到commit消息后将”增加积分消息“ 状态标记为可消费,此时MQ订阅方即正常消费消息;若Producer 本地事务执行失败则自动向MQServer发送rollback消息,MQ Server接收到rollback消息后执行rollback逻辑 。MQ订阅方消费消息,消费成功则向MQ回应ack,否则将重复接收消息。这里ack默认自动回应,即程序执行正常则自动回应ack。
  5. 事务回查:如果执行Producer端本地事务过程中,执行端挂掉,或者超时,MQ Server将会不停的询问同组的其他 Producer来获取事务执行状态,这个过程叫事务回查。MQ Server会根据事务回查结果来决定是否投递消息。

以上主干流程已由RocketMQ实现,对用户侧来说,用户需要分别实现本地事务执行以及本地事务回查方法,因此只需关注本地事务的执行状态即可。

RoacketMQ提供RocketMQLocalTransactionListener接口:

public interface RocketMQLocalTransactionListener {

    /**
     * 发送prepare消息成功此方法被回调,该方法用于执行本地事务
     * @param msg 回传的消息,利用transactionId即可获取到该消息的唯一Id
     * @param arg 调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,这里能获取到
     * @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
     */
    RocketMQLocalTransactionState executeLocalTransaction(final Message msg, final Object arg);

    /**
     * @param msg 通过获取transactionId来判断这条消息的本地事务执行状态
     * @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
     */
    RocketMQLocalTransactionState checkLocalTransaction(final Message msg);
}

发送事务消息:

TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
//设置TransactionListener实现
producer.setTransactionListener(transactionListener);
//发送事务消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);

7 分布式事务解决方案之最大努力通知

7.1 什么是最大努力通知

最大努力通知也是一种解决分布式事务的方案,下边是一个是充值的例子

image-20250821161634227

  1. 账户系统调用充值系统接口
  2. 充值系统完成支付处理向账户系统发起充值结果通知,若通知失败,则充值系统按策略进行重复通知
  3. 账户系统接收到充值结果通知修改充值状态
  4. 账户系统未接收到通知会主动调用充值系统的接口查询充值结果。

最大努力通知方案的目标:发起通知方通过一定的机制最大努力将业务处理结果通知到接收方。

  1. 有一定的消息重复通知机制:因为接收通知方可能没有接收到通知,此时要有一定的机制对消息重复通知
  2. 消息校对机制:如果尽最大努力也没有通知到接收方,或者接收方消费消息后要再次消费,此时可由接收方主动向通知方查询消息信息来满足需求。

7.2 最大努力通知与可靠消息区别

  1. 解决方案思想不同。可靠消息一致性,发起通知方需要保证将消息发出去,并且将消息发到接收通知方,消息的可靠性关键由发起通知方来保证。最大努力通知,发起通知方尽最大的努力将业务处理结果通知为接收通知方,但是可能消息接收不到,此时需要接收通知方主动调用发起通知方的接口查询业务处理结果,通知的可靠性关键在接收通知方。
  2. 两者的业务应用场景不同。可靠消息一致性关注的是交易过程的事务一致,以异步的方式完成交易。最大努力通知关注的是交易后的通知事务,即将交易结果可靠的通知出去。
  3. 技术解决方向不同。可靠消息一致性要解决消息从发出到接收的一致性,即消息发出并且被接收到。最大努力通知无法保证消息从发出到接收的一致性,只提供消息接收的可靠性机制。可靠机制是,最大努力的将消息通知给接收方,当消息无法被接收方接收时,由接收方主动查询消息(业务处理结果)。

7.3 解决方案

通过对最大努力通知的理解,采用MQ的ack机制就可以实现最大努力通知

image-20250821161941055

  1. 发起通知方将通知发给MQ。使用普通消息机制将通知发给MQ,如果消息没有发出去可由接收通知方主动请求发起通知方查询业务执行结果。
  2. 接收通知方监听 MQ。
  3. 接收通知方接收消息,业务处理完成回应ack。
  4. 接收通知方若没有回应ack则MQ会重复通知。MQ会按照间隔1min、5min、10min、30min、1h、2h、5h、10h的方式,逐步拉大通知间隔 (如果MQ采用rocketMq,在broker中可进行配置),直到达到通知要求的时间窗口上限。
  5. 接收通知方可通过消息校对接口来校对消息的一致性。