seata处理分布式事务

lijunyi2022-07-01SpringCloud微服务

简介

Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。

Seata是什么?open in new window

官方下载open in new window

seata01

一个典型的分布式事务过程

DANGER

分布式事务处理过程可以概括为:1 id + 3组件模型

1、Transaction lD XID :全局唯一的事务ID

2、三组件概念open in new window

Transaction Coordinator (TC):事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚;

Transaction Manager (TM):控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议;

Resource Manager (RM):控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚

处理过程😶‍🌫️

1、TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的XID;

2、XID 在微服务调用链路的上下文中传播;

3、RM 向 TC 注册分支事务,将其纳入XID对应全局事务的管辖;

4、TM 向 TC 发起针对XID的全局提交或回滚决议;

5、TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求。

seata02

怎么用

全局@GlobalTransactional -> SEATA的分布式交易解决方案

seata03

搭建Seata TC 协调者

1、下载 seata-server-1.4.0open in new window

2、创建seata所需要的表

seata运行需要将事务的信息保存在数据库,因此需要创建一些表,找到seata-1.4.0源码的script\server\db这个目录,将会看到以下SQL文件:

seata04

根据需要执行对应的数据库脚本

seata05

3、修改file.conf配置文件

TIP

file.conf 配置文件位于 conf 目录下,主要修改 service模块store模块

其中:service模块 新版本需要手动添加,可参照: file.confopen in new window

service模块: 自定义事务组名称

store模块: 事务日志存储模式为db、数据库连接信息

service {
  #1、自定义事务组名称
  vgroupMapping.my_test_tx_group = "fsp_tx_group"
  #only support when registry.type=file, please don't set multiple addresses
  default.grouplist = "127.0.0.1:8091"
  #degrade, current not support
  enableDegrade = false
  #disable seata
  disableGlobalTransaction = false
}

## transaction log store, only used in seata-server
store {
  ##2、事务日志存储模式为db
  mode = "db"

  ## file store property
  file {
    ## store location dir
    dir = "sessionStore"
    # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
    maxBranchSessionSize = 16384
    # globe session size , if exceeded throws exceptions
    maxGlobalSessionSize = 512
    # file buffer size , if exceeded allocate new buffer
    fileWriteBufferCacheSize = 16384
    # when recover batch read size
    sessionReloadReadSize = 100
    # async, sync
    flushDiskMode = async
  }

  ## database store property
  db {
    ## 数据库链接信息
    datasource = "druid"
    ## mysql/oracle/postgresql/h2/oceanbase etc.
    dbType = "mysql"
    ## 注意如果是mysql8版本,此处采用cj驱动,url后缀需要加serverTimezone=Asia/Shanghai
    driverClassName = "com.mysql.cj.jdbc.Driver"
    url = "jdbc:mysql://127.0.0.1:3306/seata?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8"
    user = "root"
    password = "admin"
    minConn = 5
    maxConn = 100
    globalTable = "global_table"
    branchTable = "branch_table"
    lockTable = "lock_table"
    queryLimit = 100
    maxWait = 5000
  }

  ## redis store property
  redis {
    host = "127.0.0.1"
    port = "6379"
    password = ""
    database = "0"
    minConn = 1
    maxConn = 10
    maxTotal = 100
    queryLimit = 100
  }

}

DANGER

注意:数据库采用 mysql8,连接信息需要修改,同时需要将 seata-server-1.4.0\seata\lib\jdbc 文件夹下 mysql8 的驱动复制到 lib 文件夹下

4、修改 registry.conf 配置文件

registry {
  # 使用Nacos作为seata的注册中心
  type = "nacos"
  loadBalance = "RandomLoadBalance"
  loadBalanceVirtualNodes = 10

  nacos {
    application = "seata-server"
    serverAddr = "127.0.0.1:8848"
    group = "SEATA_GROUP"
    namespace = ""
    cluster = "default"
    username = ""
    password = ""
  }
  # 剩余其他官方提供的不再展示......
}

config {
  # 注意这里,下一节将会详细描述
  type = "file"

  nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = ""
    group = "SEATA_GROUP"
    username = ""
    password = ""
  }
  consul {
    serverAddr = "127.0.0.1:8500"
  }
  apollo {
    appId = "seata-server"
    apolloMeta = "http://192.168.1.204:8801"
    namespace = "application"
    apolloAccesskeySecret = ""
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  file {
    name = "file.conf"
  }
}

5、启动Nacos

将Nacos先启动,然后再启动seata-server,观察Nacos控制台中服务列表

seata06seata07

seata客户端搭建

上述已经将Seata的服务端(TC)搭建完成了,下面就以电商系统为例,通过编码方式实现分布式事务。

首先分别需要创建如下几个库和表:

TIP

seata_order:存储订单的数据库:

​ seata_order库下建t_order表;

seata_storage:存储库存的数据库:

​ seata_storage库下建t_storage表;

seata_account:存储账户信息的数据库:

​ seata_account库下建t_account表;

然后我们会创建3个微服务分别是 订单服务库存服务账户服务

模拟场景为:当用户下单时,会在订单服务中创建一个订单,然后通过远程调用库存服务来扣减下单商品的库存,再通过远程调用账户服务来扣减用户账户里面的余额,最后在订单服务中修改订单状态为已完成。

微服务准备

新建 订单服务 seata-order-service2001库存服务 seata-storage-service2002账户服务 seata-account-service2003

注:步骤 1-6 的代码在三个微服务里是相同的,此处就不重复贴出代码了

1、引入依赖

<properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <druid-spring-boot-starter.version>1.1.22</druid-spring-boot-starter.version>
        <mybatis-spring-boot-starter.version>2.1.4</mybatis-spring-boot-starter.version>
        <seata-all.version>1.4.0</seata-all.version>
    </properties>

    <dependencies>
        <!--nacos-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <!--seata-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
            <exclusions>
                <exclusion>
                    <artifactId>seata-all</artifactId>
                    <groupId>io.seata</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-all</artifactId>
            <version>${seata-all.version}</version>
        </dependency>
        <!--feign-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>
        <!--web-actuator-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <!--mysql-druid-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>${druid-spring-boot-starter.version}</version>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>${mybatis-spring-boot-starter.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>

2、yml配置文件

server:
  port: 2001

spring:
  application:
    name: seata-order-service
  cloud:
    alibaba:
      seata:
        #自定义事务组名称需要与seata-server中的对应
        tx-service-group: fsp_tx_group
    nacos:
      discovery:
        server-addr: localhost:8848
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/seata_order?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=UTC
    username: root
    password: admin
logging:
  level:
    io:
      seata: info
mybatis:
  mapperLocations: classpath:mapper/*.xml

3、新建 file.conf

resources 目录下新建 file.conf

transport {
  # tcp, unix-domain-socket
  type = "TCP"
  #NIO, NATIVE
  server = "NIO"
  #enable heartbeat
  heartbeat = true
  # the client batch send request enable
  enableClientBatchSendRequest = true
  #thread factory for netty
  threadFactory {
      bossThreadPrefix = "NettyBoss"
      workerThreadPrefix = "NettyServerNIOWorker"
      serverExecutorThread-prefix = "NettyServerBizHandler"
      shareBossWorker = false
      clientSelectorThreadPrefix = "NettyClientSelector"
      clientSelectorThreadSize = 1
      clientWorkerThreadPrefix = "NettyClientWorkerThread"
      # netty boss thread size
      bossThreadSize = 1
      #auto default pin or 8
      workerThreadSize = "default"
   }
  shutdown {
      # when destroy server, wait seconds
      wait = 3
  }
  serialization = "seata"
  compressor = "none"
}

service {
  #transaction service group mapping
  #修改自定义事务组名称
  vgroupMapping.fsp_tx_group = "default"
  #only support when registry.type=file, please don't set multiple addresses
  default.grouplist = "127.0.0.1:8091"
  #degrade, current not support
  enableDegrade = false
  #disable seata
  disableGlobalTransaction = false
}


client {
  rm {
    asyncCommitBufferLimit = 10000
    lock {
      retryInterval = 10
      retryTimes = 30
      retryPolicyBranchRollbackOnConflict = true
    }
    reportRetryCount = 5
    tableMetaCheckEnable = false
    reportSuccessEnable = false
    sagaBranchRegisterEnable = false
  }
  tm {
    commitRetryCount = 5
    rollbackRetryCount = 5
    defaultGlobalTransactionTimeout = 60000
    degradeCheck = false
    degradeCheckPeriod = 2000
    degradeCheckAllowTimes = 10
  }
  undo {
    dataValidation = true
    onlyCareUpdateColumns = true
    logSerialization = "jackson"
    logTable = "undo_log"
  }
  log {
    exceptionRate = 100
  }
}

## transaction log store
store {
  ## store mode: file、db
  mode = "db"

  ## file store
  file {
    dir = "sessionStore"

    # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
    maxBranchSessionSize  = 16384
    # globe session size , if exceeded throws exceptions
    maxGlobalSessionSize  = 512
    # file buffer size , if exceeded allocate new buffer
    fileWriteBufferCacheSize  = 16384
    # when recover batch read size
    sessionReloadReadSize  = 100
    # async, sync
    flushDiskMode = async
  }

  ## database store
  db {
    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
    datasource = "dbcp"
    ## mysql/oracle/h2/oceanbase etc.
    dbType  = "mysql"
    driverClassName  = "com.mysql.cj.jdbc.Driver"
    url = "jdbc:mysql://127.0.0.1:3306/seata"
    user = "root"
    password = "admin"
    minConn  = 1
    maxConn  = 3
    globalTable = "global_table"
    branchTable = "branch_table"
    lockTable = "lock_table"
    queryLimit = 100
    maxWait = 5000
  }
}

4、新建 registry.conf

resources 目录下新建 registry.conf

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "nacos"
  loadBalance = "RandomLoadBalance"
  loadBalanceVirtualNodes = 10

  nacos {
    application = "seata-server"
    serverAddr = "127.0.0.1:8848"
    group = "SEATA_GROUP"
    namespace = ""
    cluster = "default"
    username = ""
    password = ""
  }
  eureka {
    serviceUrl = "http://localhost:8761/eureka"
    application = "default"
    weight = "1"
  }
  redis {
    serverAddr = "localhost:6379"
    db = 0
    password = ""
    cluster = "default"
    timeout = 0
  }
  zk {
    cluster = "default"
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
  }
  consul {
    cluster = "default"
    serverAddr = "127.0.0.1:8500"
  }
  etcd3 {
    cluster = "default"
    serverAddr = "http://localhost:2379"
  }
  sofa {
    serverAddr = "127.0.0.1:9603"
    application = "default"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    cluster = "default"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
  }
  file {
    name = "file.conf"
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "file"

  nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = ""
    group = "SEATA_GROUP"
    username = ""
    password = ""
  }
  consul {
    serverAddr = "127.0.0.1:8500"
  }
  apollo {
    appId = "seata-server"
    apolloMeta = "http://192.168.1.204:8801"
    namespace = "application"
    apolloAccesskeySecret = ""
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  file {
    name = "file.conf"
  }
}

5、修改启动类

/**
 *  取消数据源的自动创建
 * @author LiJunYi
 */
@EnableDiscoveryClient
@EnableFeignClients
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class SeataOrderMainApp2001
{

    public static void main(String[] args)
    {
        SpringApplication.run(SeataOrderMainApp2001.class, args);
    }
}

6、数据源代理配置

/**
 * 数据源代理配置
 * 使用Seata对数据源进行代理
 * @author LiJunYi
 */
@Configuration
public class DataSourceProxyConfig {

    @Value("${mybatis.mapperLocations}")
    private String mapperLocations;

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource druidDataSource(){
        return new DruidDataSource();
    }

    @Bean
    public DataSourceProxy dataSourceProxy(DataSource dataSource) {
        return new DataSourceProxy(dataSource);
    }

    @Bean
    public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
        SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
        sqlSessionFactoryBean.setDataSource(dataSourceProxy);
        sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations));
        sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
        return sqlSessionFactoryBean.getObject();
    }

}

7、帐户服务接口

/**
 * 帐户服务
 *
 * @author LiJunYi
 */
@FeignClient(value = "seata-account-service")
public interface AccountService
{
    /**
     * 减少
     *
     * @param userId 用户id
     * @param money
     * @return {@link CommonResult}
     */
    @PostMapping(value = "/account/decrease")
    CommonResult decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money);
}

8、库存服务接口

/**
 * 库存服务
 *
 * @author LiJunYi
 */
@FeignClient(value = "seata-storage-service")
public interface StorageService
{
    /**
     * 减少
     *
     * @param productId 产品id
     * @param count
     * @return {@link CommonResult}
     */
    @PostMapping(value = "/storage/decrease")
    CommonResult decrease(@RequestParam("productId") Long productId, @RequestParam("count") Integer count);
}

9、订单服务

/**
 * 订单服务
 *
 * @author LiJunYi
 */
public interface OrderService
{
    /**
     * 创建
     *
     * @param order 订单
     */
    void create(Order order);
}
/**
 * 订单服务impl
 *
 * @author LiJunYi
 */
@Service
@Slf4j
public class OrderServiceImpl implements OrderService
{
    @Resource
    private OrderDao orderDao;
    @Resource
    private StorageService storageService;
    @Resource
    private AccountService accountService;

    /**
     * 创建订单->调用库存服务扣减库存->调用账户服务扣减账户余额->修改订单状态
     * @GlobalTransactional注解开启分布式事务 
     */
    @Override
    @GlobalTransactional(name = "fsp-create-order",rollbackFor = Exception.class)
    public void create(Order order)
    {
        log.info("----->开始新建订单");
        //1 新建订单
        orderDao.create(order);

        //2 扣减库存
        log.info("----->订单微服务开始调用库存,做扣减Count");
        storageService.decrease(order.getProductId(),order.getCount());
        log.info("----->订单微服务开始调用库存,做扣减end");

        //3 扣减账户
        log.info("----->订单微服务开始调用账户,做扣减Money");
        accountService.decrease(order.getUserId(),order.getMoney());
        log.info("----->订单微服务开始调用账户,做扣减end");

        //4 修改订单状态,从零到1,1代表已经完成
        log.info("----->修改订单状态开始");
        orderDao.update(order.getUserId(),0);
        log.info("----->修改订单状态结束");

        log.info("----->下订单结束了,O(∩_∩)O哈哈~");

    }
}

测试

启动三个微服务,查看Nacos服务列表是否正确

seata08

订单库存金额表的初始值

seata09seata10seata11

正常执行情况下:

seata12seata13

模拟openFeign请求超时异常,测试事务是否回滚

seata14seata15

从上图可以看到,openFeign请求超时报错,事务进行了回滚。

Last Updated 6/14/2024, 3:05:31 AM