分布式事务

在分布式系统中一次操作由多个系统协同完成,这种一次事务操作涉及多个系统通过网络协同完成的过程称为分布式事务。

传统基于 XA 的分布式事务,许多 NoSQL 并不支持,同样为了保证较高的一致性,也必须牺牲可用性

传统的单机数据库的事务称为本地事务,而全局事务则涉及多个数据库的数据读写

基本模型

故障模式:

柔性事务与刚性事务

并发控制

由于单机锁的脆弱性,所以需要引入分布式锁来进行并发控制避免多个节点并发操作数据破坏一致性,一般数据复制都会拥有一个主节点,读可以在副本上进行,但写必须在主节点写,这样配合分布式锁,但如果没有主节点,那就需要使用法定人数同意协议来进行

当然有锁就会有死锁,为了检测死锁,每个节点可以维护一张全局的锁等待图,发生死锁,就协商牺牲掉某些事务,也可以由中心化的协调器来进行死锁检测与恢复

解决方案

两阶段提交(2PC)

sequenceDiagram    participant 事务协调器 as 事务协调器    participant 本地资源管理器1 as 本地资源管理器1    participant 本地资源管理器2 as 本地资源管理器2    Note over 事务协调器: 第一步:准备阶段    事务协调器 ->> 本地资源管理器1: 发送准备请求    本地资源管理器1 ->> 事务协调器: 就绪    事务协调器 ->> 本地资源管理器2: 发送准备请求    本地资源管理器2 ->> 事务协调器: 就绪    Note over 事务协调器: 第二步:提交阶段    事务协调器 ->> 本地资源管理器1: 提交请求    本地资源管理器1 ->> 事务协调器: 成功    事务协调器 ->> 本地资源管理器2: 提交请求    本地资源管理器2 ->> 事务协调器: 成功

准备阶段:协调者向参与者发起指令,参与者评估自己的状态,如果参与者评估指令可以完成,则会写redo或者undo日志,然后锁定资源,执行操作,但并不提交

提交阶段:如果每个参与者明确返回准备成功,则协调者向参与者发送提交指令,参与者释放锁定的资源,如何任何一个参与者明确返回准备失败,则协调者会发送中止指令,参与者取消已经变更的事务,释放锁定的资源。

准备阶段做了大量的工作,提交阶段是很轻量的,只要网络正常,成功的概率会非常高,分阶段把容易出错的工作和最终提交的步骤隔离开

常用的两阶段提交协议是XA

Percolator 对 2PC 做了一些改进:

  1. 准备阶段,事务管理器向分片发送 Prepare 请求,包含了具体的数据操作要求,分片接到请求后要做两件事,写日志和添加私有版本,私有版本只有当前事务能够操作,通常其他事务不能读写这条记录,然后从参与事务的分片中随机选择出一个作为主锁
  2. 提交阶段,事务管理器只需要和拥有主锁的分片通讯,发送 Commit 指令,后面会有异步的线程让其他分片的私有记录公开化

2PC 往往伴随着很大的延迟,为了缩短写操作的延迟:

三阶段提交(3PC)

为了缓解2PC的缺点 3PC增加了一个询问阶段

询问阶段:协调者询问参与者是否可以完成指令,协调者只需要回答是还是不是,而不需要做真正的操作,这个阶段超时将导致事务中止

准备阶段

提交阶段

三段式提交对单点问题和回滚时的性能问题有所改善,但是它对一致性风险问题并未有任何改进

共享事务

通过多个服务共用一个数据源的方式来实现,不过这种方式很鸡肋,因为往往数据库才是整个系统的瓶颈

事务补偿(TCC)

sequenceDiagram    participant 业务应用 as 业务应用    participant 事务协调器 as 事务协调器    participant 服务A as 服务A    participant 服务B as 服务B    participant 数据库A as 数据库A    participant 数据库B as 数据库B        业务应用 ->> 事务协调器: 1. 创建事务    业务应用 ->> 服务A: 2. 调用Try接口    服务A ->> 数据库A: 写数据    事务协调器 ->> 服务B: 3. 调用Try接口    服务B ->> 数据库B: 写数据    alt 成功        事务协调器 ->> 服务A: 4. 调用Confirm接口        服务A ->> 数据库A: 确认写数据        事务协调器 ->> 服务B: 4. 调用Confirm接口        服务B ->> 数据库B: 确认写数据    end    alt 失败        事务协调器 ->> 服务A: 4. 调用Cancel接口        服务A ->> 数据库A: 取消写数据        事务协调器 ->> 服务B: 4. 调用Cancel接口        服务B ->> 数据库B: 取消写数据    end

使用消息队列实现最终一致性

利用本地消息表

sequenceDiagram    订单系统 ->> 订单系统: 创建订单,插入本地消息    订单系统 ->> MQ: 减少库存    MQ ->> 库存系统: 减少库存    库存系统 ->> MQ: 减少库存,插入本地消息    库存系统 ->> MQ: 库存减少成功    MQ ->> 订单系统: 库存减少成功

在这种方案下,本地消息表是为了保证消息的可靠投递,如果只有事务的一部分成功,事务的其他部分如果失败后就不断重试,直至操作成功或者人工介入

支持事务的消息队列,本质上是TCC:

sequenceDiagram    订单系统 ->> MQ: 开启事务    订单系统 ->> MQ: 发送半消息    订单系统 ->> 订单系统: 执行本地事务创建订单    订单系统 ->> MQ: 提交/回滚    MQ ->> 库存系统: 事务被提交才会投递消息    MQ ->> 订单系统: 长时间没接收到提交或回滚请求    订单系统 ->> 订单系统: 进行事务反查, 确定订单是否创建成功    订单系统 ->> MQ: 提交/回滚    MQ ->> 库存系统: 事务被提交才会投递消息

最大努力通知方案

类似于第三方支付的支付回调 一直进行重试 直到成功为止

本地消息表

202031620440

补偿的方式

批注 2020-03-16 164628

一个通过消息队列实现最终一致性的案例

反诈项目目前有两个网,一个是公安网,一个是反诈内网,这两个互相隔离的网络,要想进行数据交互,就必须通过数据摆渡服务器来进行,泉州这边基础设施比较完善,可以通过 Kafka 来进行数据摆渡,像别的地方比较落后的,基本上都是通过 FTP 服务器来进行。

这个需求是这样的,数据需要从反诈内网的一个系统发送到公安网的另外一个系统。为了达到两边数据的一致性,就必须引入一些机制来保障。

消息队列使用的是 Kafka。

首先要考虑生产者的消息可靠性投递:

  1. 向 Kafka 发送数据要等待 Kafka 的 ack 确认,保证 Kafka 那边接收到
  2. 发送数据的同时通过本地事务写消息表,后续发送失败会通过这个消息表不断重试,成功后再删除消息记录

Kafka 需要保证的是数据不丢失,由于 Kafka 是先写到缓存后面再统一刷盘,所以极端情况下像断电是有可能导致 broker 丢数据,使用 min.insync.replicas 来进一步降低丢数据的风险。

最后是消费者的可靠性消费:

  1. 配置手动签收数据,避免数据落库前消费者出现什么错误丢数据
  2. 做好重复消息消费幂等性处理,这点是通过使用数据库的唯一约束来保障的,一旦唯一 ID 重复,异常就能被捕获,如果传递过来的数据没有 ID,这边就能通过 预警时间 + 手机号 + 数据来源的方式来唯一生成一个ID

除了以上机制外,生产者那边也会定期发送反诈内网的那边的数据总量,接入到公安网这边的监控系统,公安网这边会定期巡检,如果发现两边数据量不一致,就需要让人工介入排查了

SAGA

(消息驱动的本地事务序列)

通过将事务拆分为一系列正向原子操作T1 T2 ... TN

与一系列的补偿原子操作:C1 C2 ... CN

这些操作都必须保证是幂等的,当事务发生失败,可以采取两种策略:

  1. 正向恢复 不断重试T 直至成功
  2. 反向恢复 反向执行补偿操作 将数据恢复至原始状态

使用SAGA维护一致性

使用SAGA完成一个跨服务的订单创建

每一步所产生的子事务都会被写入数据库,必须通过补偿事务的方式来进行回滚。这样的事务缺少隔离性

协作模式

协同式:事务的决策和执行逻辑分布在每个参与方中

协同式

编排式:决策和执行逻辑集中在编排器类,由该中心指挥各个参与方

中心编排

为避免编排器类含有过多的业务逻辑,可通过设计只负责排序的编排器解决这个问题。

实现隔离性

saga包含三种事务类型:

对策:

LCN

原理

2020311161130

使用

客户端

<dependency>    <groupId>com.codingapi.txlcn</groupId>    <artifactId>txlcn-tc</artifactId>    <version>5.0.2.RELEASE</version></dependency><dependency>    <groupId>com.codingapi.txlcn</groupId>    <artifactId>txlcn-txmsg-netty</artifactId>    <version>5.0.2.RELEASE</version></dependency>
@EnableDistributedTransaction

发起者

@LcnTransaction@Transactional(rollbackFor = Exception.class)public void consume(){    jdbcTemplate.update("INSERT INTO tb_order VALUES(1,1,'test')");    String result = producerRemote.home();}

参与者

@LcnTransaction@Transactional(rollbackFor = Exception.class)public String home() {    jdbcTemplate.update("UPDATE stock SET stock = stock -1 WHERE product_id = 1");    return name+port;}

集群

tx-lcn.client.manager-address=127.0.0.1:8070,127.0.0.1:8071