Seata分布式事务

  • 分布式事务:
    • 刚性事务:强一致性(seata)
    • 柔性事务:最终一致性(MQ事务消息)

seata

Seata 是一款开源的分布式事务解决方案,强一致性,要么同时成功,要么同时失败

结合公司内部框架版本,集成seata

版本信息

  • SpringCloud:Hoxton.SR3
  • SpringBoot:2.2.5.RELEASE
  • seata:1.1.0
  • nacos:1.2.1
  • mysql:5.7

服务端集成步骤

seata-server下载

下载地址:https://github.com/seata/seata/releases/download/v1.1.0/seata-server-1.1.0.zip

创建seata数据库及表

  • 数据库名为seata
  • 创建表结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32),
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;

-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL,
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32),
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8),
`status` TINYINT,
`client_id` VARCHAR(64),
`application_data` VARCHAR(2000),
`gmt_create` DATETIME(6),
`gmt_modified` DATETIME(6),
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;

-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR(128) NOT NULL,
`xid` VARCHAR(96),
`transaction_id` BIGINT,
`branch_id` BIGINT NOT NULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;

服务端nacos配置文件

  • 创建config.txt
1
2
3
4
5
6
7
store.mode=db
store.db.datasource=dbcp
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://192.168.80.128:3306/seata?useUnicode=true
store.db.user=root
store.db.password=aacopy.cn
  • 创建nacos-config.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#!/usr/bin/env bash
# Copyright 1999-2019 Seata.io Group.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at、
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

while getopts ":h:p:g:t:" opt
do
case $opt in
h)
host=$OPTARG
;;
p)
port=$OPTARG
;;
g)
group=$OPTARG
;;
t)
tenant=$OPTARG
;;
?)
echo " USAGE OPTION: $0 [-h host] [-p port] [-g group] [-t tenant] "
exit 1
;;
esac
done

if [[ -z ${host} ]]; then
host=localhost
fi
if [[ -z ${port} ]]; then
port=8848
fi
if [[ -z ${group} ]]; then
group="SEATA_GROUP"
fi
if [[ -z ${tenant} ]]; then
tenant=""
fi

nacosAddr=$host:$port
contentType="content-type:application/json;charset=UTF-8"

echo "set nacosAddr=$nacosAddr"
echo "set group=$group"

failCount=0
tempLog=$(mktemp -u)
function addConfig() {
curl -X POST -H "${1}" "http://$2/nacos/v1/cs/configs?dataId=$3&group=$group&content=$4&tenant=$tenant" >"${tempLog}" 2>/dev/null
if [[ -z $(cat "${tempLog}") ]]; then
echo " Please check the cluster status. "
exit 1
fi
if [[ $(cat "${tempLog}") =~ "true" ]]; then
echo "Set $3=$4 successfully "
else
echo "Set $3=$4 failure "
(( failCount++ ))
fi
}

count=0
for line in $(cat $(dirname "$PWD")/config.txt | sed s/[[:space:]]//g); do
(( count++ ))
key=${line%%=*}
value=${line#*=}
addConfig "${contentType}" "${nacosAddr}" "${key}" "${value}"
done

echo "========================================================================="
echo " Complete initialization parameters, total-count:$count , failure-count:$failCount "
echo "========================================================================="

if [[ ${failCount} -eq 0 ]]; then
echo " Init nacos config finished, please start seata-server. "
else
echo " init nacos config fail. "
fi
  • config.txt要放在nacos-config.sh的上一级目录里,可以和github上目录保持一致,将nacos-config.sh放在nacos目录下,config.txt放在和nacos同一级目录
  • 在nacos-config.sh的目录中,右键打开git bash here
  • 执行命令

修改seata-server配置文件registry.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"

nacos {
serverAddr = "192.168.80.128:8848"
namespace = ""
cluster = "default"
}
}

config {
# file、nacos 、apollo、zk、consul、etcd3
type = "nacos"

nacos {
serverAddr = "192.168.80.128:8848"
namespace = ""
group = "SEATA_GROUP"
}
}

启动server

1
nohup ./seata-server.sh -h 192.168.80.128 -p 8091 &

客户端集成步骤

新增数据库表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
CREATE TABLE IF NOT EXISTS `undo_log`
(
`id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT 'increment id',
`branch_id` BIGINT(20) NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(100) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME NOT NULL COMMENT 'modify datetime',
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';

maven依赖

  • 添加依赖
1
2
3
4
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
  • 排除依赖
1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>com.quectel.web.cloud</groupId>
<artifactId>cloud-common-log</artifactId>
<exclusions>
<exclusion>
<artifactId>spring-cloud-starter-sleuth</artifactId>
<groupId>org.springframework.cloud</groupId>
</exclusion>
</exclusions>
</dependency>

客户端配置文件

添加配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
seata:
config:
type: nacos
nacos:
server-addr: 192.168.80.128:8848
namespace: public
username: nacos
password: nacos
registry:
type: nacos
nacos:
server-addr: 192.168.80.128:8848
namespace: public
username: nacos
password: nacos
tx-service-group: ${spring.application.name}_tx_group
application-id: ${spring.application.name}

nacos添加配置文件

  • Group: SEATA_GROUP
  • Data ID
    • service.vgroupMapping.datacenter_tx_group
    • service.vgroupMapping.rule_tx_group
  • 配置内容:default

添加注解@GlobalTransactional

在需要分布式事务的地方增加@GlobalTransactional注解,只需要在入口的服务上加,被rpc调用的服务可以不加

集成完毕测试

注意事项

优缺点

  • 优点:强一致性,要么同时成功,要么同时失败,不需要考虑失败后的数据补偿,集成简单,老代码改造容易
  • 缺点:性能较低,通过锁表实现。

使用场景

  • 需要强制性,业务简单,
  • 涉及到的表不多的情况
  • 并发量不高的操作

版本问题

当前最新版本1.4.2,在集成过程中会出现导致spring security oauth2的bean无法加载的报错,是官方bug,目前还没有正式版修复,只有补丁文件,可以参考:

http://www.aacopy.cn/2022/02/08/Seata/Seata-Demo1.4.2/

Seata-OpenFeign问题

  • 报错信息
1
Caused by: com.netflix.client.ClientException: Load balancer does not have available server for client: ip
  • 原因分析

    • 项目中同时引用了spring-cloud-alibaba-seataspring-cloud-starter-sleuth两个依赖。
    • Sleuth 的配置类TraceFeignClientAutoConfiguration 和 Seata 的配置类 SeataFeignClientAutoConfiguration 都创建了 FeignHystrixBuilder的bean,导致spring加载时冲突
  • 解决方案

    目前项目没有用到zipkin,sleuth,可以直接去掉该jar

更换默认版本

  • 如果服务端和客户端的版本不一致,客户端会报错

    1
    [main] io.seata.core.rpc.netty.NettyClientChannelManager 170 no available service 'default' found, please make sure registry config correct
  • 需要指定版本

    • 在根pom中增加

      1
      2
      3
      4
      5
      6
      7
      8
      9
      <dependencyManagement>
      <dependencies>
      <dependency>
      <groupId>io.seata</groupId>
      <artifactId>seata-spring-boot-starter</artifactId>
      <version>1.2.0</version>
      </dependency>
      </dependencies>
      </dependencyManagement>
    • 在子模块pom中增加

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      <dependency>
      <groupId>com.alibaba.cloud</groupId>
      <artifactId>spring-cloud-alibaba-seata</artifactId>
      <version>2.2.0.RELEASE</version>
      <exclusions>
      <exclusion>
      <groupId>io.seata</groupId>
      <artifactId>seata-spring-boot-starter</artifactId>
      </exclusion>
      </exclusions>
      </dependency>
      <dependency>
      <groupId>io.seata</groupId>
      <artifactId>seata-spring-boot-starter</artifactId>
      <version>1.2.0</version>
      </dependency>

事务消息

通过消息中间件,进行解耦,只需要保证“本地事务”和“消息提交到MQ”两者同时成功或者失败,客户端在接收到消息后再做相应的业务处理,如果成功就标记消息消费成功,如果失败,进行重试或者数据补偿操作。只需要对业务数据保证最终的一致性即可。以RocketMQ为例

版本信息

RocketMQ:4.9.2

服务端集成步骤

服务端下载

二进制文件下载:https://dlcdn.apache.org/rocketmq/4.9.2/rocketmq-all-4.9.2-bin-release.zip

启动服务

1
2
nohup sh bin/mqnamesrv &
nohup sh bin/mqbroker -n localhost:9876 &

可视化控制台安装

github:https://github.com/apache/rocketmq-externals

安装文档:https://rocketmq-1.gitbook.io/rocketmq-connector/rocketmq-connect/rocketmq-console/an-zhuang-shi-yong

docker安装

1
2
3
docker run -p 8080:8080 --name rocketmq-console-ng \
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.80.128:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" \
-d styletang/rocketmq-console-ng

客户端集成

maven依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq.spring.boot.starter.version}</version>
</dependency>

配置文件

1
2
rocketmq.name-server=192.168.80.128:9876
rocketmq.producer.group=my-group

通过普通消息ACK机制实现

生产者

在生产者端,通过返回值SendResult获取消息发送状态,如果是OK,说明发送成功,提交本地事务,如果失败,抛出异常,回滚本地事务

1
2
3
4
5
6
SendResult sendResult = rocketMQTemplate.syncSend("test-topic-1", MessageBuilder.withPayload("Hello").build());
if(sendResult.getSendStatus() == SendStatus.SEND_OK) {
//消息发送成功
} else {
throw new RuntimeException("消息发送异常");
}

消费者

在消费者端,通过ACK机制,在成功消费消息后,手动确认消息已被消费,如果消费失败,这不确认,进行重试操作,或者通过数据补偿来保证数据一致性,通过将异常消息保存到数据库,然后成功消费消息,人工干预处理。在消费者端为了避免重复消费,需要做幂等处理

  • redis,通过setNX或者incr原子操作,确定消息已经被消费
  • 数据库将消息id作为唯一索引

问题

普通的消息已经基本满足业务分布式事务需求,只要做好消息幂等处理即可,考虑到异常情况:

  • 在发送消费到MQ后,服务器宕机了,然后本地事务被回滚,但是消息已经被发送到MQ,这种情况就没有保证本地事务和消息的一致性

事务消息

https://rocketmq.apache.org/rocketmq/the-design-of-transactional-message/

  • 事务消息在一阶段对用户不可见
  • 在本地事务执行完成后会向MQServer发送Commit或Rollback操作,此时如果在发送消息的时候生产者出故障了,那么要保证这条消息最终被消费,MQServer会像服务端发送回查请求,确认本地事务的执行状态

生产者集成步骤

事务入口,改为消息事务

1
2
3
4
5
6
7
8
9
10
11
12
TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction("rocketmq-learn:order-create", MessageBuilder.withPayload(orderRequest).build(), orderRequest);
if(SendStatus.SEND_OK == transactionSendResult.getSendStatus()) {
if(LocalTransactionState.COMMIT_MESSAGE == transactionSendResult.getLocalTransactionState()) {
log.info("订单服务下单成功");
return "success";
} else if(LocalTransactionState.ROLLBACK_MESSAGE == transactionSendResult.getLocalTransactionState()) {
log.error("订单服务下单失败");
return "error";
} else if(LocalTransactionState.UNKNOW == transactionSendResult.getLocalTransactionState()){
return "warn";
}
}

创建listener监听事务消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package cn.aacopy.learn.rocketmq.order.listener;

import cn.aacopy.learn.rocketmq.order.entity.OrderRequest;
import cn.aacopy.learn.rocketmq.order.service.OrderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;

/**
* @author cmyang
* @date 2022/2/14 0:14
*/
@RocketMQTransactionListener
@Slf4j
public class TransactionListener implements RocketMQLocalTransactionListener {

@Autowired
private OrderService orderService;

@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
log.info("{}", msg);
log.info("开始执行本地事务...");
MessageHeaders headers = msg.getHeaders();
//获取事务id
String transactionId = (String) headers.get(RocketMQUtil.toRocketHeaderKey(RocketMQHeaders.TRANSACTION_ID));
//获取tags
String tags = (String) headers.get(RocketMQUtil.toRocketHeaderKey(RocketMQHeaders.TAGS));
if(StringUtils.equals(tags, "order-create")) {
try {
orderService.create((OrderRequest) arg, transactionId);
log.info("本地事务执行成功");
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("本地事务异常");
return RocketMQLocalTransactionState.ROLLBACK;
}
}
return RocketMQLocalTransactionState.UNKNOWN;
}

@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
log.info("检查本地事务状态");
log.info("{}", msg);
//查询数据库确定本地事务是否正常提交了
return RocketMQLocalTransactionState.COMMIT;
}
}

事务日志记录要和业务操作放在同一个事务里提交

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
@Transactional
public OrderDO create(OrderRequest orderRequest, String transactionId) {
//减少账户余额
int orderMoney = calculate(orderRequest.getCommodityCode(), orderRequest.getOrderCount());

//创建订单
OrderDO orderDO = new OrderDO();
orderDO.setUserId(orderRequest.getUserId());
orderDO.setCommodityCode(orderRequest.getCommodityCode());
orderDO.setCount(orderRequest.getOrderCount());
orderDO.setMoney(orderMoney);
save(orderDO);

TransactionLogDO transactionLogDO = new TransactionLogDO();
transactionLogDO.setTransactionId(transactionId);
transactionLogMapper.insert(transactionLogDO);
// int i=1/0;
return orderDO;
}

完成

消费者端和消费普通消息一样

注意事项:

优缺点:

  • 优点:事务消息将feign改为异步调用,大大提高了性能,保证数据最终一致
  • 缺点:本地事务一旦提交,是无法回滚的,如果需要回滚数据,需要根据具体业务做数据补偿,幂等处理,对老项目加入分布式事务要求较高,需要根据具体业务分析,异常情况的处理,公司项目基本都带了用户信息请求,如果改为消息,需要将用户信息带到消息内,并改造框架底层获取用户信息方法