Seata

suaxi
2021-05-05 / 0 评论 / 62 阅读 / 正在检测是否收录...

seata是一个分布式的解决方案,致力于在微服务架构下提供高性能和简单易用的分布式微服务,官网:http://seata.io/zh-cn/


一、三组概念

TC 事务协调者:维护全局和分支事务的状态,驱动全局事务提交或回滚

TM 事务管理器:定义全局事务的范围:开始、提交、回滚

RM 资源管理器:管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚

1.三组概念.png


二、安装

GitHub地址:https://github.com/seata/seata(以Nacos 2.0、Seata 1.4.2为例)


修改配置文件

1、/conf/file.conf (注意与1.0之前的版本区分,1.0之后可以在yml文件中指定service配置)

修改数据库模式(使用数据库存储事务信息)和数据源

2.修改file.conf.png


2、/conf/registry.conf

3.修改registry.conf01.png

4.修改registry.conf02.png


3、mysql建表

1、新建seata数据库

2、运行mysql.sql

注意:建表sql在/conf/REDME.md文件中,点击server出现存放sql的github地址:https://github.com/seata/seata/tree/develop/script/server

5.建表sql.png


4、启动

先启动Nacos再启动Seata(bin/seata-server.bat)


三、业务测试

1、业务说明

创建三个微服务:订单---库存---账户

大致流程:当用户下单时,订单服务创建订单,远程调用库存服务扣减库存,再通过远程调用账户服务来扣减账户余额,最后在订单服务中修改订单状态为已完成。


2、数据库建表

  1. 创建三个数据库seata_order(订单),seata_storage(库存),seata_account(账户)
  2. 创建对应的表
-- t_order
DROP TABLE IF EXISTS `t_order`;
CREATE TABLE `t_order`  (
  `id` bigint(11) NOT NULL AUTO_INCREMENT,
  `user_id` bigint(11) NULL DEFAULT NULL COMMENT '用户id',
  `product_id` bigint(11) NULL DEFAULT NULL COMMENT '产品id',
  `count` int(11) NULL DEFAULT NULL COMMENT '数量',
  `money` decimal(11, 0) NULL DEFAULT NULL COMMENT '金额',
  `status` int(1) NULL DEFAULT NULL COMMENT '订单状态:0:创建中; 1:已完结',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 14 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- t_storage
DROP TABLE IF EXISTS `t_storage`;
CREATE TABLE `t_storage`  (
  `id` bigint(11) NOT NULL AUTO_INCREMENT,
  `product_id` bigint(11) NULL DEFAULT NULL COMMENT '产品id',
  `total` int(11) NULL DEFAULT NULL COMMENT '总库存',
  `used` int(11) NULL DEFAULT NULL COMMENT '已用库存',
  `residue` int(11) NULL DEFAULT NULL COMMENT '剩余库存',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

INSERT INTO `t_storage` VALUES (1, 1, 100, 0, 100);

-- t_account
DROP TABLE IF EXISTS `t_account`;
CREATE TABLE `t_account`  (
  `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
  `user_id` bigint(11) NULL DEFAULT NULL COMMENT '用户id',
  `total` decimal(10, 0) NULL DEFAULT NULL COMMENT '总额度',
  `used` decimal(10, 0) NULL DEFAULT NULL COMMENT '已用余额',
  `residue` decimal(10, 0) NULL DEFAULT 0 COMMENT '剩余可用额度',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

INSERT INTO `t_account` VALUES (1, 1, 1000, 0, 1000);


undo回滚日志表,三个库每个都需要创建一张回滚表,分别执行三次

DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log`  (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `context` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime(0) NOT NULL,
  `log_modified` datetime(0) NOT NULL,
  `ext` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE,
  UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 3 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;


3、seata-order-service

pom
<dependencies>
    <!-- API -->
    <dependency>
        <groupId>com.sw</groupId>
        <artifactId>cloud-api-commons</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>

    <!--nacos-->
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
    </dependency>

    <!--seata-->
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
        <exclusions>
            <exclusion>
                <artifactId>seata-all</artifactId>
                <groupId>io.seata</groupId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>io.seata</groupId>
        <artifactId>seata-spring-boot-starter</artifactId>
        <version>1.4.0</version>
    </dependency>

    <!--feign-->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-openfeign</artifactId>
    </dependency>

    <!--web-actuator-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>

    <!--mysql-druid-->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>druid-spring-boot-starter</artifactId>
        <version>1.1.16</version>
    </dependency>

    <dependency>
        <groupId>org.mybatis.spring.boot</groupId>
        <artifactId>mybatis-spring-boot-starter</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
</dependencies>


yml
server:
  port: 2001

spring:
  application:
    name: seata-order-service
  cloud:
    nacos:
      discovery:
        server-addr: localhost:8848
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/seata_order?useSSL=true&serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf-8
    username: root
    password: 123456

seata:
  enabled: true
  application-id: ${spring.application.name}
  # 自定义事务组名称
  tx-service-group: test_tx_group
  service:
    vgroup-mapping:
      test_tx_group: default
  config:
    nacos:
      namespace:
      server-addr: 127.0.0.1:8848
      group: SEATA_GROUP
      userName: "nacos"
      password: "nacos"
  registry:
    type: nacos
    nacos:
      application: seata-server
      server-addr: 127.0.0.1:8848
      namespace:
      userName: "nacos"
      password: "nacos"

feign:
  hystrix:
    enabled: false

logging:
  level:
    io:
      seata: info


启动类
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class) //取消自动创建数据源
@EnableDiscoveryClient
@EnableFeignClients
@MapperScan("com.sw.mapper")
public class SeataOrderMain2001 {
    public static void main(String[] args) {
        SpringApplication.run(SeataOrderMain2001.class, args);
    }
}


pojo
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Order implements Serializable {

    private Long id;

    private Long userId;

    private Long productId;

    private Integer count;

    private BigDecimal money;

    private Integer status; //订单状态:0:创建中;1:已完结

}


mapper
@Repository
public interface OrderMapper {

    /**
     * 新建订单
     * @param order
     */
    @Insert("insert into t_order(id,user_id,product_id,count,money,status) " +
            "values(null,#{userId},#{productId},#{count},#{money},0)")
    void create(Order order);

    /**
     * 更新订单
     * @param userId
     * @param status
     */
    @Update("update t_order set status = 1 " +
            "where user_id=#{userId} and status = #{status}")
    void update(@Param("userId")Long userId, @Param("status")Integer status);
}


service

OrderService

public interface OrderService {

    void create(Order order);
}

StorageService

@FeignClient("seata-storage-service")
public interface StorageService {

    @PostMapping("/storage/decrease")
    CommonResult decrease(@RequestParam("productId")Long productId, @RequestParam("count")Integer count);
}

AccountService

@FeignClient("seata-account-service")
public interface AccountService {

    @PostMapping("/account/decrease")
    CommonResult decrease(@RequestParam("userId")Long userId, @RequestParam("money")BigDecimal money);

}


service实现
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {

    @Autowired
    private OrderMapper orderMapper;

    @Autowired
    private AccountService accountService;

    @Autowired
    private StorageService storageService;

    @Override
    public void create(Order order) {
        //1.新建订单
        log.info("===开始新建订单===");
        orderMapper.create(order);

        //2.扣减库存
        log.info("===订单微服务调用库存===");
        storageService.decrease(order.getProductId(), order.getCount());
        log.info("===订单微服务调用库存,库存数量操作结束===");

        //3.扣减账户
        log.info("===订单微服务调用账户余额===");
        accountService.decrease(order.getUserId(), order.getMoney());
        log.info("===订单微服务调用账户余额,账户余额操作结束===");

        //4.修改订单状态
        log.info("===修改订单状态,开始===");
        //从 0 到 1,1代表已经完成
        orderMapper.update(order.getUserId(), 0);
        log.info("===修改订单状态,操作结束===");

        log.info("订单操作完成(*^_^*)");
    }
}


controller
@RestController
@RequestMapping("/order")
public class OrderController {

    @Autowired
    private OrderService orderService;

    @GetMapping("/create")
    public CommonResult create(Order order){
        orderService.create(order);
        return new CommonResult(200, "订单创建成功!");
    }
}


config

主启动类排除了数据源配置,此处需指定数据源代理

@Configuration
public class DataSourceProxyConfig {

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource druidDataSource(){
        return new DruidDataSource();
    }

    @Primary
    @Bean("dataSource")
    public DataSourceProxy dataSourceProxy(DataSource dataSource){
        return new DataSourceProxy(dataSource);
    }

}


4、seata-storage-service

yml
server:
  port: 2002

spring:
  application:
    name: seata-storage-service
  cloud:
    nacos:
      discovery:
        server-addr: localhost:8848
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/seata_storage?useSSL=true&serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf-8
    username: root
    password: 123456

seata:
  enabled: true
  application-id: ${spring.application.name}
  tx-service-group: test_tx_group
  service:
    vgroup-mapping:
      test_tx_group: default
  config:
    nacos:
      namespace:
      server-addr: 127.0.0.1:8848
      group: SEATA_GROUP
      userName: "nacos"
      password: "nacos"
  registry:
    type: nacos
    nacos:
      application: seata-server
      server-addr: 127.0.0.1:8848
      namespace:
      userName: "nacos"
      password: "nacos"

feign:
  hystrix:
    enabled: false

logging:
  level:
    io:
      seata: info


mapper
@Repository
public interface StorageMapper {

    /**
     * 扣减库存信息
     * @param productId
     * @param count
     */
    @Update("update t_storage " +
            "set used = used + #{count}," +
            "residue = residue - #{count} "+
            "where product_id = #{productId}"
    )
    void decrease(@Param("productId")Long productId, @Param("count")Integer count);
}


service
public interface StorageService {

    void decrease(Long productId, Integer count);
}


service实现
@Slf4j
@Service
public class StorageServiceImpl implements StorageService {

    @Autowired
    private StorageMapper storageMapper;

    @Override
    public void decrease(Long productId, Integer count) {
        log.info("===storage-service 开始扣减库存===");
        storageMapper.decrease(productId, count);
        log.info("===storage-service 扣减库存操作结束===");
    }
}

其他配置同理order微服务模块的


5、seata-account-service

mapper
@Repository
public interface AccountMapper {

    /**
     * 扣减账户余额
     * @param userId
     * @param money
     */
    @Update("update t_account set "+
            "used = used + #{money},"+
            "residue = residue - #{money} "+
            "where user_id = #{userId}"
    )
    void decrease(@Param("userId")Long userId, @Param("money") BigDecimal money);
}

其他配置同理storage微服务模块的创建


6、不添加全局事务管理测试

seata-account-service模块

1、手动模拟超时

@Slf4j
@Service
public class AccountServiceImpl implements AccountService {

    @Autowired
    private AccountMapper accountMapper;

    @Override
    public void decrease(Long userId, BigDecimal money) {
        log.info("===account-service 开始扣减账户余额===");
        //模拟超时异常
        try {
            TimeUnit.SECONDS.sleep(20);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        accountMapper.decrease(userId, money);
        log.info("===account-service 扣减账户余额操作结束===");
    }
}


浏览器输入http://localhost:2001/order/create?userId=1&productId=1&count=10&money=100

报错Read TimeOut异常:

6.手动添加异常测试.png


此时数据库的状态:

7.异常测试结果(未添加全局事务).png


7、开启全局事务

在order模块中添加全局事务注解@GlobalTransactional

8.开启全局事务.png


使用同样的方法测试,发生调用超时异常后,数据库中被修改的数据回滚到了操作之前的状态

0

评论 (0)

取消