后端 事务 Seata分布式事务 cmyang 2022-02-17 2023-11-12
分布式事务:
刚性事务:强一致性(seata)
柔性事务:最终一致性(MQ事务消息)
seata Seata 是一款开源的分布式事务解决方案,强一致性,要么同时成功,要么同时失败
结合公司内部框架版本,集成seata
版本信息
SpringCloud:Hoxton.SR3
SpringBoot:2.2.5.RELEASE
seata:1.1.0
nacos:1.2.1
mysql:5.7
服务端集成步骤 seata-server下载 下载地址:https://github.com/seata/seata/releases/download/v1.1.0/seata-server-1.1.0.zip
创建seata数据库及表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 -- -------------------------------- The script used when storeMode is 'db' -------------------------------- -- the table to store GlobalSession data CREATE TABLE IF NOT EXISTS `global_table` ( `xid` VARCHAR(128) NOT NULL, `transaction_id` BIGINT, `status` TINYINT NOT NULL, `application_id` VARCHAR(32), `transaction_service_group` VARCHAR(32), `transaction_name` VARCHAR(128), `timeout` INT, `begin_time` BIGINT, `application_data` VARCHAR(2000), `gmt_create` DATETIME, `gmt_modified` DATETIME, PRIMARY KEY (`xid`), KEY `idx_gmt_modified_status` (`gmt_modified`, `status`), KEY `idx_transaction_id` (`transaction_id`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8; -- the table to store BranchSession data CREATE TABLE IF NOT EXISTS `branch_table` ( `branch_id` BIGINT NOT NULL, `xid` VARCHAR(128) NOT NULL, `transaction_id` BIGINT, `resource_group_id` VARCHAR(32), `resource_id` VARCHAR(256), `branch_type` VARCHAR(8), `status` TINYINT, `client_id` VARCHAR(64), `application_data` VARCHAR(2000), `gmt_create` DATETIME(6), `gmt_modified` DATETIME(6), PRIMARY KEY (`branch_id`), KEY `idx_xid` (`xid`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8; -- the table to store lock data CREATE TABLE IF NOT EXISTS `lock_table` ( `row_key` VARCHAR(128) NOT NULL, `xid` VARCHAR(96), `transaction_id` BIGINT, `branch_id` BIGINT NOT NULL, `resource_id` VARCHAR(256), `table_name` VARCHAR(32), `pk` VARCHAR(36), `gmt_create` DATETIME, `gmt_modified` DATETIME, PRIMARY KEY (`row_key`), KEY `idx_branch_id` (`branch_id`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8;
服务端nacos配置文件
1 2 3 4 5 6 7 store.mode =db store.db.datasource =dbcp store.db.dbType =mysql store.db.driverClassName =com.mysql.jdbc.Driver store.db.url =jdbc:mysql://192.168.80.128:3306/seata?useUnicode=true store.db.user =root store.db.password =aacopy.cn
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 #!/usr/bin/env bash while getopts ":h:p:g:t:" optdo case $opt in h) host=$OPTARG ;; p) port=$OPTARG ;; g) group=$OPTARG ;; t) tenant=$OPTARG ;; ?) echo " USAGE OPTION: $0 [-h host] [-p port] [-g group] [-t tenant] " exit 1 ;; esac done if [[ -z ${host} ]]; then host=localhost fi if [[ -z ${port} ]]; then port=8848 fi if [[ -z ${group} ]]; then group="SEATA_GROUP" fi if [[ -z ${tenant} ]]; then tenant="" fi nacosAddr=$host :$port contentType="content-type:application/json;charset=UTF-8" echo "set nacosAddr=$nacosAddr " echo "set group=$group " failCount=0 tempLog=$(mktemp -u) function addConfig () { curl -X POST -H "${1} " "http://$2 /nacos/v1/cs/configs?dataId=$3 &group=$group &content=$4 &tenant=$tenant " >"${tempLog} " 2>/dev/null if [[ -z $(cat "${tempLog} " ) ]]; then echo " Please check the cluster status. " exit 1 fi if [[ $(cat "${tempLog} " ) =~ "true" ]]; then echo "Set $3 =$4 successfully " else echo "Set $3 =$4 failure " (( failCount++ )) fi } count=0 for line in $(cat $(dirname "$PWD " )/config.txt | sed s/[[:space:]]//g); do (( count++ )) key=${line%%=*} value=${line#*=} addConfig "${contentType} " "${nacosAddr} " "${key} " "${value} " done echo "=========================================================================" echo " Complete initialization parameters, total-count:$count , failure-count:$failCount " echo "=========================================================================" if [[ ${failCount} -eq 0 ]]; then echo " Init nacos config finished, please start seata-server. " else echo " init nacos config fail. " fi
config.txt要放在nacos-config.sh的上一级目录里,可以和github上目录保持一致,将nacos-config.sh放在nacos目录下,config.txt放在和nacos同一级目录
在nacos-config.sh的目录中,右键打开git bash here
执行命令
修改seata-server配置文件registry.conf 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 registry { type = "nacos" nacos { serverAddr = "192.168.80.128:8848" namespace = "" cluster = "default" } } config { type = "nacos" nacos { serverAddr = "192.168.80.128:8848" namespace = "" group = "SEATA_GROUP" } }
启动server 1 nohup ./seata-server.sh -h 192.168.80.128 -p 8091 &
客户端集成步骤 新增数据库表 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 CREATE TABLE IF NOT EXISTS `undo_log` ( `id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT 'increment id', `branch_id` BIGINT(20) NOT NULL COMMENT 'branch transaction id', `xid` VARCHAR(100) NOT NULL COMMENT 'global transaction id', `context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization', `rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info', `log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status', `log_created` DATETIME NOT NULL COMMENT 'create datetime', `log_modified` DATETIME NOT NULL COMMENT 'modify datetime', PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`) ) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';
maven依赖
1 2 3 4 <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-seata</artifactId > </dependency >
1 2 3 4 5 6 7 8 9 10 <dependency > <groupId > com.quectel.web.cloud</groupId > <artifactId > cloud-common-log</artifactId > <exclusions > <exclusion > <artifactId > spring-cloud-starter-sleuth</artifactId > <groupId > org.springframework.cloud</groupId > </exclusion > </exclusions > </dependency >
客户端配置文件 添加配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 seata: config: type: nacos nacos: server-addr: 192.168 .80 .128 :8848 namespace: public username: nacos password: nacos registry: type: nacos nacos: server-addr: 192.168 .80 .128 :8848 namespace: public username: nacos password: nacos tx-service-group: ${spring.application.name}_tx_group application-id: ${spring.application.name}
nacos添加配置文件
Group: SEATA_GROUP
Data ID
service.vgroupMapping.datacenter_tx_group
service.vgroupMapping.rule_tx_group
配置内容:default
添加注解@GlobalTransactional 在需要分布式事务的地方增加@GlobalTransactional注解,只需要在入口的服务上加,被rpc调用的服务可以不加
集成完毕测试 注意事项 优缺点
优点:强一致性,要么同时成功,要么同时失败,不需要考虑失败后的数据补偿,集成简单,老代码改造容易
缺点:性能较低,通过锁表实现。
使用场景
需要强制性,业务简单,
涉及到的表不多的情况
并发量不高的操作
版本问题 当前最新版本1.4.2,在集成过程中会出现导致spring security oauth2的bean无法加载的报错,是官方bug,目前还没有正式版修复,只有补丁文件,可以参考:
http://www.aacopy.cn/2022/02/08/Seata/Seata-Demo1.4.2/
Seata-OpenFeign问题
1 Caused by: com.netflix.client.ClientException: Load balancer does not have available server for client: ip
更换默认版本
如果服务端和客户端的版本不一致,客户端会报错
1 [main] io.seata.core.rpc.netty.NettyClientChannelManager 170 no available service 'default' found, please make sure registry config correct
需要指定版本
在根pom中增加
1 2 3 4 5 6 7 8 9 <dependencyManagement > <dependencies > <dependency > <groupId > io.seata</groupId > <artifactId > seata-spring-boot-starter</artifactId > <version > 1.2.0</version > </dependency > </dependencies > </dependencyManagement >
在子模块pom中增加
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-alibaba-seata</artifactId > <version > 2.2.0.RELEASE</version > <exclusions > <exclusion > <groupId > io.seata</groupId > <artifactId > seata-spring-boot-starter</artifactId > </exclusion > </exclusions > </dependency > <dependency > <groupId > io.seata</groupId > <artifactId > seata-spring-boot-starter</artifactId > <version > 1.2.0</version > </dependency >
事务消息 通过消息中间件,进行解耦,只需要保证“本地事务”和“消息提交到MQ”两者同时成功或者失败,客户端在接收到消息后再做相应的业务处理,如果成功就标记消息消费成功,如果失败,进行重试或者数据补偿操作。只需要对业务数据保证最终的一致性即可。以RocketMQ为例
版本信息 RocketMQ:4.9.2
服务端集成步骤 服务端下载 二进制文件下载:https://dlcdn.apache.org/rocketmq/4.9.2/rocketmq-all-4.9.2-bin-release.zip
启动服务 1 2 nohup sh bin/mqnamesrv & nohup sh bin/mqbroker -n localhost:9876 &
可视化控制台安装 github:https://github.com/apache/rocketmq-externals
安装文档:https://rocketmq-1.gitbook.io/rocketmq-connector/rocketmq-connect/rocketmq-console/an-zhuang-shi-yong
docker安装
1 2 3 docker run -p 8080:8080 --name rocketmq-console-ng \ -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.80.128:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" \ -d styletang/rocketmq-console-ng
客户端集成 maven依赖 1 2 3 4 5 <dependency > <groupId > org.apache.rocketmq</groupId > <artifactId > rocketmq-spring-boot-starter</artifactId > <version > ${rocketmq.spring.boot.starter.version}</version > </dependency >
配置文件 1 2 rocketmq.name-server =192.168.80.128:9876 rocketmq.producer.group =my-group
通过普通消息ACK机制实现 生产者 在生产者端,通过返回值SendResult获取消息发送状态,如果是OK,说明发送成功,提交本地事务,如果失败,抛出异常,回滚本地事务
1 2 3 4 5 6 SendResult sendResult = rocketMQTemplate.syncSend("test-topic-1" , MessageBuilder.withPayload("Hello" ).build());if (sendResult.getSendStatus() == SendStatus.SEND_OK) { } else { throw new RuntimeException ("消息发送异常" ); }
消费者 在消费者端,通过ACK机制,在成功消费消息后,手动确认消息已被消费,如果消费失败,这不确认,进行重试操作,或者通过数据补偿来保证数据一致性,通过将异常消息保存到数据库,然后成功消费消息,人工干预处理。在消费者端为了避免重复消费,需要做幂等处理
redis,通过setNX或者incr原子操作,确定消息已经被消费
数据库将消息id作为唯一索引
问题 普通的消息已经基本满足业务分布式事务需求,只要做好消息幂等处理即可,考虑到异常情况:
在发送消费到MQ后,服务器宕机了,然后本地事务被回滚,但是消息已经被发送到MQ,这种情况就没有保证本地事务和消息的一致性
事务消息 https://rocketmq.apache.org/rocketmq/the-design-of-transactional-message/
事务消息在一阶段对用户不可见
在本地事务执行完成后会向MQServer发送Commit或Rollback操作,此时如果在发送消息的时候生产者出故障了,那么要保证这条消息最终被消费,MQServer会像服务端发送回查请求,确认本地事务的执行状态
生产者集成步骤 事务入口,改为消息事务
1 2 3 4 5 6 7 8 9 10 11 12 TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction("rocketmq-learn:order-create" , MessageBuilder.withPayload(orderRequest).build(), orderRequest); if (SendStatus.SEND_OK == transactionSendResult.getSendStatus()) { if (LocalTransactionState.COMMIT_MESSAGE == transactionSendResult.getLocalTransactionState()) { log.info("订单服务下单成功" ); return "success" ; } else if (LocalTransactionState.ROLLBACK_MESSAGE == transactionSendResult.getLocalTransactionState()) { log.error("订单服务下单失败" ); return "error" ; } else if (LocalTransactionState.UNKNOW == transactionSendResult.getLocalTransactionState()){ return "warn" ; } }
创建listener监听事务消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 package cn.aacopy.learn.rocketmq.order.listener;import cn.aacopy.learn.rocketmq.order.entity.OrderRequest;import cn.aacopy.learn.rocketmq.order.service.OrderService;import lombok.extern.slf4j.Slf4j;import org.apache.commons.lang3.StringUtils;import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;import org.apache.rocketmq.spring.support.RocketMQHeaders;import org.apache.rocketmq.spring.support.RocketMQUtil;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.Message;import org.springframework.messaging.MessageHeaders;@RocketMQTransactionListener @Slf4j public class TransactionListener implements RocketMQLocalTransactionListener { @Autowired private OrderService orderService; @Override public RocketMQLocalTransactionState executeLocalTransaction (Message msg, Object arg) { log.info("{}" , msg); log.info("开始执行本地事务..." ); MessageHeaders headers = msg.getHeaders(); String transactionId = (String) headers.get(RocketMQUtil.toRocketHeaderKey(RocketMQHeaders.TRANSACTION_ID)); String tags = (String) headers.get(RocketMQUtil.toRocketHeaderKey(RocketMQHeaders.TAGS)); if (StringUtils.equals(tags, "order-create" )) { try { orderService.create((OrderRequest) arg, transactionId); log.info("本地事务执行成功" ); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { log.error("本地事务异常" ); return RocketMQLocalTransactionState.ROLLBACK; } } return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction (Message msg) { log.info("检查本地事务状态" ); log.info("{}" , msg); return RocketMQLocalTransactionState.COMMIT; } }
事务日志记录要和业务操作放在同一个事务里提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Override @Transactional public OrderDO create (OrderRequest orderRequest, String transactionId) { int orderMoney = calculate(orderRequest.getCommodityCode(), orderRequest.getOrderCount()); OrderDO orderDO = new OrderDO (); orderDO.setUserId(orderRequest.getUserId()); orderDO.setCommodityCode(orderRequest.getCommodityCode()); orderDO.setCount(orderRequest.getOrderCount()); orderDO.setMoney(orderMoney); save(orderDO); TransactionLogDO transactionLogDO = new TransactionLogDO (); transactionLogDO.setTransactionId(transactionId); transactionLogMapper.insert(transactionLogDO); return orderDO; }
完成 消费者端和消费普通消息一样
注意事项: 优缺点:
优点:事务消息将feign改为异步调用,大大提高了性能,保证数据最终一致
缺点:本地事务一旦提交,是无法回滚的,如果需要回滚数据,需要根据具体业务做数据补偿,幂等处理,对老项目加入分布式事务要求较高,需要根据具体业务分析,异常情况的处理,公司项目基本都带了用户信息请求,如果改为消息,需要将用户信息带到消息内,并改造框架底层获取用户信息方法