ShardingSphere如何轻松驾驭Seata柔性分布式事务?
2024/11/16 23:02:45
本文主要是介绍ShardingSphere如何轻松驾驭Seata柔性分布式事务?,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
0 前文
上一文解析了 ShardingSphere 强一致性事务支持 XAShardingTransactionManager ,本文继续:
- 讲解该类
- 介绍支持柔性事务的 SeataATShardingTransactionManager
sharding-transaction-xa-core中关于 XAShardingTransactionManager,本文研究 XATransactionManager 和 ShardingConnection 类实现。
1 XAShardingTransactionManager
1.1 init
public void init(final DatabaseType databaseType, final Collection<ResourceDataSource> resourceDataSources) { for (ResourceDataSource each : resourceDataSources) { // 根据传入的 ResourceDataSource创建XATransactionDataSource并缓存 cachedDataSources.put(each.getOriginalName(), new XATransactionDataSource(databaseType, each.getUniqueResourceName(), each.getDataSource(), xaTransactionManager)); } // 对通过 SPI 创建的 XATransactionManager 也执行其 init 初始化 xaTransactionManager.init(); }
1.2 其它方法
实现也简单:
@Override public TransactionType getTransactionType() { return TransactionType.XA; } @SneakyThrows @Override public boolean isInTransaction() { return Status.STATUS_NO_TRANSACTION != xaTransactionManager.getTransactionManager().getStatus(); } @Override public Connection getConnection(final String dataSourceName) throws SQLException { return cachedDataSources.get(dataSourceName).getConnection(); }
1.3 事务操作相关
begin、commit 和 rollback直接委托保存在 XATransactionManager#TransactionManager 完成:
@SneakyThrows @Override public void begin() { xaTransactionManager.getTransactionManager().begin(); } @SneakyThrows @Override public void commit() { xaTransactionManager.getTransactionManager().commit(); } @SneakyThrows @Override public void rollback() { xaTransactionManager.getTransactionManager().rollback(); }
2 AtomikosTransactionManager
TransactionManager默认实现。
2.1 AtomikosXARecoverableResource
代表资源:
public final class AtomikosXARecoverableResource extends JdbcTransactionalResource { private final String resourceName; AtomikosXARecoverableResource(final String serverName, final XADataSource xaDataSource) { super(serverName, xaDataSource); resourceName = serverName; } // 比对SingleXAResource#ResourceName,确定是否在使用资源,此即设计包装 XAResource 的 SingleXAResource 类的原因 @Override public boolean usesXAResource(final XAResource xaResource) { return resourceName.equals(((SingleXAResource) xaResource).getResourceName()); } }
2.2 AtomikosXARecoverableResource
public final class AtomikosTransactionManager implements XATransactionManager { private final UserTransactionManager transactionManager = new UserTransactionManager(); private final UserTransactionService userTransactionService = new UserTransactionServiceImp(); @Override public void init() { userTransactionService.init(); } @Override public void registerRecoveryResource(final String dataSourceName, final XADataSource xaDataSource) { userTransactionService.registerResource(new AtomikosXARecoverableResource(dataSourceName, xaDataSource)); } @Override public void removeRecoveryResource(final String dataSourceName, final XADataSource xaDataSource) { userTransactionService.removeResource(new AtomikosXARecoverableResource(dataSourceName, xaDataSource)); } @Override @SneakyThrows public void enlistResource(final SingleXAResource xaResource) { transactionManager.getTransaction().enlistResource(xaResource); } @Override public TransactionManager getTransactionManager() { return transactionManager; } @Override public void close() { userTransactionService.shutdown(true); } }
对 Atomikos 的 UserTransactionManager、UserTransactionService 简单调用,Atomikos#UserTransactionManager 实现 TransactionManager 接口,封装所有 TransactionManager 需要完成的工作。
看完 sharding-transaction-xa-atomikos-manager,再看 sharding-transaction-xa-bitronix-manager 工程。基于 bitronix 的 XATransactionManager 实现方案
3 BitronixXATransactionManager
public final class BitronixXATransactionManager implements XATransactionManager { private final BitronixTransactionManager bitronixTransactionManager = TransactionManagerServices.getTransactionManager(); @Override public void init() { } @SneakyThrows @Override public void registerRecoveryResource(final String dataSourceName, final XADataSource xaDataSource) { ResourceRegistrar.register(new BitronixRecoveryResource(dataSourceName, xaDataSource)); } @SneakyThrows @Override public void removeRecoveryResource(final String dataSourceName, final XADataSource xaDataSource) { ResourceRegistrar.unregister(new BitronixRecoveryResource(dataSourceName, xaDataSource)); } @SneakyThrows @Override public void enlistResource(final SingleXAResource singleXAResource) { bitronixTransactionManager.getTransaction().enlistResource(singleXAResource); } @Override public TransactionManager getTransactionManager() { return bitronixTransactionManager; } @Override public void close() { bitronixTransactionManager.shutdown(); } }
XA两阶段提交核心类:
4 ShardingConnection
上图的整个流程源头ShardingConnection类,构造函数发现创建 ShardingTransactionManager 过程:
@Getter public final class ShardingConnection extends AbstractConnectionAdapter { public ShardingConnection(...) { ... shardingTransactionManager = runtimeContext.getShardingTransactionManagerEngine().getTransactionManager(transactionType); } }
ShardingConnection多处用到上面创建的shardingTransactionManager。如:
createConnection
获取连接:
@Override protected Connection createConnection(final String dataSourceName, final DataSource dataSource) throws SQLException { return isInShardingTransaction() ? shardingTransactionManager.getConnection(dataSourceName) : dataSource.getConnection(); }
isInShardingTransaction
判断是否在同一事务:
private boolean isInShardingTransaction() { return null != shardingTransactionManager && shardingTransactionManager.isInTransaction(); }
setAutoCommit
@Override public void setAutoCommit(final boolean autoCommit) throws SQLException { if (TransactionType.LOCAL == transactionType) { super.setAutoCommit(autoCommit); return; } if (autoCommit && !shardingTransactionManager.isInTransaction() || !autoCommit && shardingTransactionManager.isInTransaction()) { return; } if (autoCommit && shardingTransactionManager.isInTransaction()) { shardingTransactionManager.commit(); return; } if (!autoCommit && !shardingTransactionManager.isInTransaction()) { closeCachedConnections(); shardingTransactionManager.begin(); } }
事务类型为本地事务时,直接调用 ShardingConnection 父类 AbstractConnectionAdapter#setAutoCommit 完成本地事务自动提交:
- autoCommit=true 且运行在事务中,调shardingTransactionManager.commit()完成提交
- autoCommit=false 且当前不在事务中时,调 shardingTransactionManager.begin() 启动事务
commit、rollback
类似setAutoCommit ,按事务类型决定是否进行分布式提交和回滚:
@Override public void commit() throws SQLException { if (TransactionType.LOCAL == transactionType) { super.commit(); } else { shardingTransactionManager.commit(); } } @Override public void rollback() throws SQLException { if (TransactionType.LOCAL == transactionType) { super.rollback(); } else { shardingTransactionManager.rollback(); } }
ShardingSphere提供两阶段提交的 XA 协议实现方案的同时,也实现柔性事务。看完 XAShardingTransactionManager,来看基于 Seata 框架的柔性事务 TransactionManager 实现类 SeataATShardingTransactionManager。
5 SeataATShardingTransactionManager
该类完全采用阿里Seata框架提供分布式事务特性,而非遵循类似 XA 这样的开发规范,所以代码实现比 XAShardingTransactionManager 类层结构简单,复杂性都屏蔽在了框架内部。
集成 Seata,先要初始化 TMClient、RMClient,在 Seata 内部,这两个客户端之间会基于RPC通信。
SeataATShardingTransactionManager#init的initSeataRPCClient初始化这俩客户端对象:
// 根据 seata.conf 创建配置对象 FileConfiguration configuration = new FileConfiguration("seata.conf"); initSeataRPCClient() { String applicationId = configuration.getConfig("client.application.id"); Preconditions.checkNotNull(applicationId, "please config application id within seata.conf file"); String transactionServiceGroup = configuration.getConfig("client.transaction.service.group", "default"); TMClient.init(applicationId, transactionServiceGroup); RMClient.init(applicationId, transactionServiceGroup); }
Seata也提供一套构建在 JDBC 规范之上的实现策略,类似03文介绍的 ShardingSphere 与 JDBC 规范之间兼容性。
Seata使用DataSourceProxy、ConnectionProxy代理对象,如DataSourceProxy:
实现了自定义Resource接口,继承AbstractDataSourceProxy(最终实现JDBC的DataSource接口)。所以,初始化 Seata 框架时,也要根据输入 DataSource 对象构建 DataSourceProxy,并通过 DataSourceProxy 获取 ConnectionProxy。
init、getConnection
@Override public void init(final DatabaseType databaseType, final Collection<ResourceDataSource> resourceDataSources) { // 初始化 Seata 客户端 initSeataRPCClient(); // 创建 DataSourceProxy 并放入Map for (ResourceDataSource each : resourceDataSources) { dataSourceMap.put(each.getOriginalName(), new DataSourceProxy(each.getDataSource())); } } @Override public Connection getConnection(final String dataSourceName) { // 根据 DataSourceProxy 获取 ConnectionProxy return dataSourceMap.get(dataSourceName).getConnection(); }
初始化后,提供了事务开启和提交相关的入口。Seata的GlobalTransaction是核心接口,封装了面向用户操作层的分布式事务访问入口:
public interface GlobalTransaction { void begin() throws TransactionException; void begin(int timeout) throws TransactionException; void begin(int timeout, String name) throws TransactionException; void commit() throws TransactionException; void rollback() throws TransactionException; GlobalStatus getStatus() throws TransactionException; String getXid(); }
ShardingSphere 作 GlobalTransaction 的用户层,也基于 GlobalTransaction 完成分布式事务操作。但 ShardingSphere 并未直接使用这层,而是设计位于sharding-transaction-base-seata-at的SeataTransactionHolder类,保存线程安全的 GlobalTransaction 对象。
SeataTransactionHolder
final class SeataTransactionHolder { private static final ThreadLocal<GlobalTransaction> CONTEXT = new ThreadLocal<>(); static void set(final GlobalTransaction transaction) { CONTEXT.set(transaction); } static GlobalTransaction get() { return CONTEXT.get(); } static void clear() { CONTEXT.remove(); } }
使用 ThreadLocal 确保对 GlobalTransaction 访问的线程安全性。
咋判断当前操作是否处于一个全局事务?Seata存在一个上下文对象RootContex保存参与者和发起者之间传播的 Xid:
- 当事务发起者开启全局事务,将 Xid 填入 RootContext
- 然后 Xid 沿服务调用链一直传播,进而填充到每个事务参与者进程的 RootContext
- 事务参与者发现 RootContext 存在 Xid,就可知自己处于全局事务
因此,只需判断:
@Override public boolean isInTransaction() { return null != RootContext.getXID(); }
Seata 也提供针对全局事务的上下文类 GlobalTransactionContext,可用:
- getCurrent 获取一个 GlobalTransaction对象
- 或通过 getCurrentOrCreate 在无法获取 GlobalTransaction 对象时新建一个
就不难理解如下实现了
begin
@Override @SneakyThrows public void begin() { // 创建一个 GlobalTransaction,保存到 SeataTransactionHolder SeataTransactionHolder.set(GlobalTransactionContext.getCurrentOrCreate()); // 从 SeataTransactionHolder 获取一个 GlobalTransaction,并调 begin 启动事务 SeataTransactionHolder.get().begin(); SeataTransactionBroadcaster.collectGlobalTxId(); }
注意到最后的类:
SeataTransactionBroadcaster
保存 Seata 全局 Xid 的一个容器类。事务启动时收集全局 Xid 并进行保存,而在事务提交或回滚时清空这些 Xid。
class SeataTransactionBroadcaster { String SEATA_TX_XID = "SEATA_TX_XID"; static void collectGlobalTxId() { if (RootContext.inGlobalTransaction()) { ShardingExecuteDataMap.getDataMap().put(SEATA_TX_XID, RootContext.getXID()); } } static void broadcastIfNecessary(final Map<String, Object> shardingExecuteDataMap) { if (shardingExecuteDataMap.containsKey(SEATA_TX_XID) && !RootContext.inGlobalTransaction()) { RootContext.bind((String) shardingExecuteDataMap.get(SEATA_TX_XID)); } } static void clear() { ShardingExecuteDataMap.getDataMap().remove(SEATA_TX_XID); } }
因此
commit、rollback和close
实现就清楚了:
@Override public void commit() { try { SeataTransactionHolder.get().commit(); } finally { SeataTransactionBroadcaster.clear(); SeataTransactionHolder.clear(); } } @Override public void rollback() { try { SeataTransactionHolder.get().rollback(); } finally { SeataTransactionBroadcaster.clear(); SeataTransactionHolder.clear(); } } @Override public void close() { dataSourceMap.clear(); SeataTransactionHolder.clear(); TmRpcClient.getInstance().destroy(); RmRpcClient.getInstance().destroy(); }
sharding-transaction-base-seata-at 工程中的代码实际上就只有这些内容,这些内容也构成了在 ShardingSphere中 集成 Seata 框架的实现过程。
6 从源码到开发
本文给出应用程序咋集成 Seata 分布式事务框架的详细过程,ShardingSphere 提供一种模版实现。日常开发,若想在业务代码集成 Seata,可参考 SeataTransactionHolder、SeataATShardingTransactionManager 等核心代码,而无需太多修改。
7 总结
XAShardingTransactionManager理解难在从 ShardingConnection 到底层 JDBC 规范的整个集成和兼容过程。
8 集成Seata框架
参考 ShardingSphere 的实现:
1. 配置 Seata 环境
- 配置文件准备: 创建
seata.conf
文件,定义applicationId
和transactionServiceGroup
等参数。 - 启动 Seata 服务: 启动 Seata Server 并确保其与数据库的事务协调机制正常工作。
2. 初始化 Seata 客户端
项目中初始化 TMClient 和 RMClient,它们分别代表事务管理器和资源管理器:
FileConfiguration configuration = new FileConfiguration("seata.conf"); String applicationId = configuration.getConfig("client.application.id"); String transactionServiceGroup = configuration.getConfig("client.transaction.service.group", "default"); TMClient.init(applicationId, transactionServiceGroup); RMClient.init(applicationId, transactionServiceGroup);
3. 数据源代理
构建 DataSourceProxy
: 使用 Seata 的 DataSourceProxy
对数据源进行代理。
DataSourceProxy dataSourceProxy = new DataSourceProxy(originalDataSource);
获取连接代理:从代理数据源中获取 ConnectionProxy
,使每个数据库连接支持事务传播。
Connection connection = dataSourceProxy.getConnection();
4. 全局事务上下文管理
基于 GlobalTransactionContext
获取或创建事务对象:
GlobalTransaction transaction = GlobalTransactionContext.getCurrentOrCreate();
绑定全局事务 XID: 当事务发起时,将全局事务的 XID 存储在 RootContext
中:
RootContext.bind(transaction.getXid());
通过 RootContext
判断事务状态:
boolean isInTransaction = RootContext.inGlobalTransaction();
5. 事务操作实现
开启事务:
transaction.begin();
提交事务:
try { transaction.commit(); } finally { RootContext.unbind(); }
回滚事务:
try { transaction.rollback(); } finally { RootContext.unbind(); }
6. 整合业务逻辑
将分布式事务的核心逻辑封装在工具类中,例如 SeataTransactionHolder
,以便方便地管理全局事务上下文:
SeataTransactionHolder.set(GlobalTransactionContext.getCurrentOrCreate());
7. 清理资源
在应用关闭时,清理客户端资源:
TmRpcClient.getInstance().destroy(); RmRpcClient.getInstance().destroy();
8. 注意事项
- 确保所有数据源通过
DataSourceProxy
代理,避免事务管理失效。 - 配置数据库支持 Undo Log 表,确保事务回滚记录正常存储。
- 调试过程中,检查 Seata Server 日志和应用日志,定位事务协调的问题。
通过上述步骤,可以在业务代码中顺利集成 Seata,实现分布式事务管理,保障数据一致性。
关注我,紧跟本系列专栏文章,咱们下篇再续!
作者简介:魔都架构师,多家大厂后端一线研发经验,在分布式系统设计、数据平台架构和AI应用开发等领域都有丰富实践经验。
各大技术社区头部专家博主。具有丰富的引领团队经验,深厚业务架构和解决方案的积累。
负责:
- 中央/分销预订系统性能优化
- 活动&券等营销中台建设
- 交易平台及数据中台等架构和开发设计
- 车联网核心平台-物联网连接平台、大数据平台架构设计及优化
- LLM Agent应用开发
- 区块链应用开发
- 大数据开发挖掘经验
- 推荐系统项目
目前主攻市级软件项目设计、构建服务全社会的应用系统。
参考:
- 编程严选网
这篇关于ShardingSphere如何轻松驾驭Seata柔性分布式事务?的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-16ShardingSphere 如何完美驾驭分布式事务与 XA 协议?
- 2024-11-16Maven资料入门指南
- 2024-11-16Maven资料入门教程
- 2024-11-16MyBatis Plus资料:新手入门教程与实践指南
- 2024-11-16MyBatis-Plus资料入门教程:快速上手指南
- 2024-11-16Mybatis资料入门教程:新手必看指南
- 2024-11-16MyBatis资料详解:新手入门与初级实战指南
- 2024-11-16MyBatisPlus资料:初学者入门指南与实用教程
- 2024-11-16MybatisPlus资料详解:初学者入门指南
- 2024-11-16MyBatisX资料:新手入门与初级教程