做设计找素材都有什么网站,无需下载直接进入的网站的代码,国内新闻摘抄,wordpress 软件公司概述
本文将主要解读 PolarDB-X 中事务部分的相关代码#xff0c;着重解读事务的一生在计算节点#xff08;CN#xff09;中的关键代码#xff1a;从开始、执行、到最后提交这一整个生命周期。
在阅读本文前#xff0c;强烈推荐先阅读与 PolarDB-X 事务系统相关的文章着重解读事务的一生在计算节点CN中的关键代码从开始、执行、到最后提交这一整个生命周期。
在阅读本文前强烈推荐先阅读与 PolarDB-X 事务系统相关的文章
PolarDB-X 强一致分布式事务原理PolarDB-X 分布式事务的实现一PolarDB-X 分布式事务的实现二InnoDB CTS 扩展PolarDB-X 分布式事务的实现四跨地域事务无处不在的 MySQL XA 事务
以及此前发布的 PolarDB-X SQL 的一生
事务与连接
在 PolarDB-X 的 CN 层与事务关系密切的是连接。这是因为数据节点DN也具备单个 DN 内的事务能力CN 则通过与 DN 的连接来管理 DN 上的事务从而实现强一致的分布式事务能力。其中涉及到的连接大致如下图所示 先简单说一下这里面涉及的一些连接。ServerConnection 类似于前端连接大部分的 SQL 语句执行的入口都是 ServerConnection#innerExecute。TConnection 中的 executeSQL 方法负责 SQL 语句的真正执行也负责创建新的事务对象。TConnection 会一直引用着这个事务对象直到事务提交或回滚。事务对象里有一个 TransactionConnectionHolder负责管理该事务用到的所有物理连接CN 连接 DN 的私有协议连接。值得一提的是ExecutionContext 作为一条逻辑 SQL 执行的上下文也会引用这个事务对象。这样后续执行器需要使用物理连接与 DN 通信时就可以通过 ExecutionContext 拿到事务对象再通过事务对象的 TransactionConnectionHolder 拿到合适的物理连接。
以上的各种连接都会在下文继续讨论。
两个例子
接下来我们以两个简单的例子来说明事务的一生在 CN 的代码中是如何体现的。
测试用表
CREATE TABLE tb1 (id int PRIMARY KEY,a int
) DBPARTITION BY HASH(id)
先在里面插入几条数据
INSERT INTO tb1 VALUES (0, 0), (1, 1), (2, 2), (3, 3);
测试使用的两个例子
-- Example 1:
BEGIN;
SELECT * FROM tb1 WHERE id 0;
UPDATE tb1 SET a 100 WHERE id 1;
COMMIT;
-- Example 2:
BEGIN;
SELECT * FROM tb1 WHERE id 0;
UPDATE tb1 SET a 101 WHERE id 1;
UPDATE tb1 SET a 101 WHERE id 0;
COMMIT;
注意到例 2 只比 例 1 多修改了 id 1 的数据。测试表是按 id 拆分的因此 id 0 和 id 1 的记录会落在不同的物理分片上假设分别为分片 0 和分片 1。例 1 读了分片 0写了分片 1然后提交了事务这将会触发我们对单分片写的“一阶段提交优化”。例 2 读了分片 0随后写了分片 1 和 分片 0然后提交了事务这将会进行完整的分布式事务提交流程。这两个例子还会触发“只读连接优化”即只有在第一次写的时候才真正开启分布式事务。
在接下来的讨论中我们默认使用 TSO 事务策略和 RR 的隔离级别。
例 1 事务的一生
BEGIN
与 MySQL 类似要开启一个事务一般有两种方式。第一种方式是显式地执行 BEGIN 或 START TRANSACTION [transaction_characteristic]执行这两种语句会调用 ServerConnection 中的 begin(boolean, IsolationLevel) 方法。第二种方式是执行 SET autocommit 0当前 session 会隐式开启事务这种方式会调用 ServerConnection 中的 setAutocommit(boolean, boolean) 方法。两种方式都会调用 TConnection 的 setAutoCommit 方法。这些方法都只是简单地记录了一些变量比如 transaction_characteristic 中设定的事务相关变量同时标记这个连接开启了事务。此时事务对象也还没创建出来也没有与后端连接进行任何交互。
读分片 0
在开启事务后执行 SELECT * FROM tb1 WHERE id 0 时才会真正创建事务对象。根据事务与连接中的讨论在 TConnection 中这条逻辑 SQL 的执行入口为 executeSQL里面会真正创建事务对象主要执行逻辑为代码出自 PolarDB-X 5.4.12 release 版本为了方便说明有删减及改动下同
// TConnection#executeSQL(ByteString, Parameters, TStatement, ExecutionContext)
public ResultSet executeSQL(ByteString sql, Parameters params, TStatement stmt,ExecutionContext executionContext) throws SQLException {if (this.trx null || this.trx.isClosed()) {// 开启事务后直到执行第一条语句才会创建事务对象。beginTransaction();}// 让 executionContext 引用 trx 对象方便后续执行器通过 trx 对象拿物理连接。executionContext.setTransaction(this.trx);resultCursor executeQuery(sql, executionContext, trxPolicyModified);
}
// TConnection#beginTransaction(boolean)
private void beginTransaction(boolean autoCommit) {// 根据一些默认的或用户设定的事务变量选择合适的事务策略比如 TSO/XA 等。trxPolicy loadTrxPolicy(executionContext);TransactionClass trxConfig trxPolicy.getTransactionType(autoCommit, readOnly);// 根据事务策略创建出对应的事务对象。this.trx transactionManager.createTransaction(trxConfig, executionContext);
}
在我们的例子中如果在上述代码打个断点可以看到创建出来的是 TsoTransaction其中一些值得关注的变量为
trx {TsoTransaction}// 此时还没有获取任何时间戳。snapshotTimestamp -1commitTimestamp -1// 事务写的第一个物理分片下称主分片事务日志将会写在这个分片上。primaryGroup null// 该事务是否跨分片如果是单分片事务会优化为一阶段提交。isCrossGroup false// 事务日志管理器负责写事务日志。globalTxLogManager {GlobalTxLogManager}// 分布式事务的 connectionHolder 都是 TransactionConnectionHolder// 里面分别存储了读写连接读连接和写连接在提交时行为会有所不同。connectionHolder {TransactionConnectionHolder}// 物理分片到对应写连接的映射。groupHeldWriteConn {HashMap}// 物理分片到对应读连接集合到映射。在 ShareReadView 优化开启时// 可以同时存在多个读连接和一个写连接因此这里读写连接需要分开管理。// 该优化不在本文展开。groupHeldReadConns {HashMap}
在执行器阶段会选择给分片 0 下发一条 SELECT 语句此时需要获取分片 0 的物理连接代码入口是 MyJdbcHandler 中的 getPhyConnection(ITransaction, ITransaction.RW, String, DataSource) 方法。其中的事务对象 Transaction 则是从 ExecutionContext 里拿到。该方法最后会调用 AbstractTransaction 这是所有分布式事务类的基类中的 getConnection 方法。
通过事务拿物理连接的代码 AbstractTransaction#getConnection 如下
// AbstractTransaction#getConnection(String, String, IDataSource, RW, ExecutionContext)
public IConnection getConnection(String schema, String group, IDataSource ds, RW rw, ExecutionContext ec) {if (/* 是事务的第一个写请求 */) {// 把这个分片作为主分片事务日志将写在这个分片上。this.primaryGroup group;// 该分片还用于生成 XA 事务的 xid。this.primaryGroupUid IServerConfigManager.getGroupUniqueId(schema, group);}// 通过 connectionHolder 拿到物理连接。IConnection conn connectionHolder.getConnection(schema, group, ds, rw);if (/* 是写请求 */ !isCrossGroup !this.primaryGroup.equals(group)) {// 事务涉及了多个分片。this.isCrossGroup true;}return conn;
}
在我们的例子中上述参数 group 是物理分片 0rw 是 READ说明需要物理分片 0 上的读连接ds 则主要用于生成物理连接。由于我们只是读请求因此分片 0 不会作为主分片会直接返回 connectionHolder.getConnection(schema, group, ds, rw) 的结果。
通过连接管理器拿物理连接的代码 TransactionConnectionHolder#getConnection 如下
// TransactionConnectionHolder#getConnection(String, String, IDataSource, RW)
public IConnection getConnection(String schema, String group, IDataSource ds, RW rw) {// 尝试获取该分片上的写连接如果有直接返回这个写连接。HeldConnection groupWriteConn groupHeldWriteConn.get(group);if (groupWriteConn ! null) {return groupWriteConn.connection;}HeldConnection freeReadConn /* 尝试找到读连接 */;if (freeReadConn ! null) {if (/* 当前需要写连接 */) {// 设置当前连接为写连接participated true 意味着该连接是写连接。freeReadConn.participated true;this.groupHeldWriteConn.put(group, freeReadConn);// 由于原本是读连接这里才真正开启分布式事务。this.trx.commitNonParticipant(group, freeReadConn.connection);this.trx.begin(schema, freeReadConn.group, freeReadConn.connection);}return freeReadConn.connection;}// 当前分片没有任何连接创建一个新的连接。还会根据读写类型// 设置好写连接 groupHeldWriteConn 或读连接集合 groupHeldReadConns。IConnection conn new DeferredConnection(/* 这里会获取并封装该分片的私有协议连接 */);if (/* 需要写连接 */) {// 开启正常的分布式事务。this.trx.begin(schema, group, conn);} else {// 优化为只读事务。this.trx.beginNonParticipant(group, conn);}return conn;
}
在我们的例子中由于是第一条语句该分片上还没有任何连接因此会先生成一个连接该分片的私有协议连接包装成 DeferredConnection然后因为是读请求会调用 beginNonParticipant 。
TsoTransaction 的 beginNonParticipant 方法如下
// TsoTransaction#beginNonParticipant(String, IConnection)
protected void beginNonParticipant(String group, IConnection conn) throws SQLException {if (snapshotTimestamp 0) {// 该事务从未拿过时间戳则在这里获取。snapshotTimestamp nextTimestamp();}// 使用私有协议的流水线执行机制执行 BEGIN。conn.executeLater(BEGIN);// 在 BEGIN 后发送时间戳。sendSnapshotSeq(conn);
}
在我们的例子中私有协议连接会流水线执行 BEGIN 语句非阻塞不等结果返回且在稍后执行物理 SQL 时才发送时间戳。至此连接上的一些初始化操作已经完成可以向执行器返回并执行读分片 0 的物理 SQL。
写分片 1
随后我们执行 UPDATE tb1 SET a 100 WHERE id 1。在执行器阶段需要给分片 1 下发一条 UPDATE 语句此时需要获取分片 1 的物理连接因此又会调用 AbstractTransaction#getConnection 方法通过事务对象拿物理连接。通过前面贴出的代码我们发现由于是事务的第一个写请求因此分片 1 会视作主分片用于生成 xid 和稍后记录事务日志。
在获取物理连接时又会调用 TransactionConnectionHolder#getConnection 方法通过连接管理器拿物理连接。通过前面贴出的代码我们发现由于分片 1 没有任何连接因此会生成一个私有协议连接包装成 DeferredConnection。与读分片 0 不同由于是写请求会执行 TsoTransaction 的 begin 方法。
TsoTransaction 的 begin 方法如下
// TsoTransaction#begin(String, String, IConnection)
protected void begin(String schema, String group, IConnection conn) throws SQLException {if (snapshotTimestamp 0) {// 该事务从未拿过时间戳则在这里获取。snapshotTimestamp nextTimestamp();}// 获取 xid。String xid getXid(group);// 触发私有协议流水线执行 XA START。conn.executeLater(XA START xid);// 在 XA START 后发送时间戳。sendSnapshotSeq(conn);
}
简单来说和此前 beginNonParticipant 方法唯一的区别在于使用了 XA START 开启事务。根据MySQL 关于 XA 事务的说明xid 由 gtrid [, bqual [, formatID ]] 组成这里的 gtrid 是“drds-事务 id 主分片Uid”这样保证了同一个事务在不同分片上执行 XA START会使用相同的 gtrid。bqual 设置当前连接的分片用于在事务恢复时确定分支事务所在的分片。 在我们的例子中xid 为drds-13e101d74e4000005ae6c3b5be613cd1, DB1_000001_GROUP 其中 13e101d74e400000 是事务 id5ae6c3b5be613cd1 是分片 1 的 UidDB1_000001_GROUP 是分片 1 的具体分片名称。值得注意的是这里会把之前的时间戳发送到分片 1。至此连接上的一些初始化操作已经完成可以向执行器返回并执行写分片 1 的物理 SQL。
COMMIT
最后执行 COMMIT 提交事务。处理 COMMIT 的代码入口为 ServerConnection#commit()其主要调用了 TConnection#commit 方法代码如下
// TConnection#commit()
public void commit() throws SQLException {try {// 触发事务的提交流程。this.trx.commit();} finally {// 大部分情况下如果事务提交成功或出现异常后正确处理了// 所有连接会被关闭并释放trx.close() 相当于什么也不做。// 但如果事务提交失败且没有处理这里会回滚事务并释放所有物理连接。this.trx.close();// 去掉对这个事务对象的引用意味着当前连接里这个事务一生的结束。this.trx null;}
}
我们重点关注一下事务的提交流程上述 this.trx.commit() 调用时会调用 ShareReadViewTransaction该类继承了 AbstractTransaction基于 XA 事务实现了共享 readview 的功能XATransaction 和 TsoTransaction 都会继承这个类在这里可以简单理解为 XA 事务的基类的 commit 方法代码如下
// ShareReadViewTransaction#commit()
public void commit() {if (!isCrossGroup) {// 如果只涉及了单个分片的写进行一阶段提交优化。commitOneShardTrx();} else {// 正常的多分片分布式事务提交流程。commitMultiShardTrx();}
}
在我们的例子中由于只写了分片 1所以进入一阶段提交优化代码如下
// ShareReadViewTransaction#commitOneShardTrx()
protected void commitOneShardTrx() {// 对持有的所有物理连接执行以下流程以提交每个物理连接开启的事务。forEachHeldConnection((group, conn, participated) - {if (!participated) {// 只读连接是 BEGIN 开启事务的且只有读操作执行 ROLLBACK 即可。conn.execute(ROLLBACK);} else {// 获取 xid。String xid getXid(group);// 写连接是 XA START 开启事务的执行 XA END 和 XA COMMIT ONE PHASE 提交事务。conn.execute(XA END xid ; XA COMMIT xid ONE PHASE);}});// 所有连接都提交了分支事务释放并清空这些物理连接。connectionHolder.closeAllConnections();
}
我们一共持有了 2 个物理连接。对于分片 0 的只读连接会直接执行 ROLLBACK对于分片 1 的写连接则执行 XA END 和 XA COMMIT ONE PHASE 提交事务。注意到我们并没有获取 commit timestamp因为在一阶段提交优化里commit timestamp 会由 InnoDB 计算生成
具体的计算规则是COMMIT_TS MAX_SEQUENCE 1其中 MAX_SEQUENCE 为 InnoDB 本地维护的历史最大的 snapshot_ts。如果提交失败了会调用事务的 close 方法代码如下
// AbstractTransaction#close()
public void close() {// 回滚所有物理连接上的事务。cleanupAllConnections();// 释放并清空这些物理连接。connectionHolder.closeAllConnections();
}
值得一提的是cleanupAllConnections() 也是 ROLLBACK 语句主要调用的方法。因此为了同时了解 ROLLBACK 语句执行流程我们也看一下 cleanupAllConnections 方法的代码
// AbstractTransaction#cleanupAllConnections()
protected final void cleanupAllConnections() {// 对持有的所有物理连接执行以下流程以回滚每个物理连接开启的事务。forEachHeldConnection((group, conn, participated) - {if (conn.isClosed()) { return; }if (!participated) {// 只读连接是 BEGIN 开启事务的且只有读操作执行 ROLLBACK 即可。conn.execute(ROLLBACK);} else {// 获取 xid。String xid getXid(group);// 写连接是 XA START 开启事务的执行 XA END 和 XA ROLLBACK 回滚事务。conn.execute(XA END xid ; XA ROLLBACK xid);}});
}
可以看到回滚的逻辑是看情况执行 ROLLBACK 或 XA ROLLBACK 来回滚事务的。
至此我们看到了例 1 事务的一生。接下来看一下多分片写的事务流程。
例 2 事务的一生
写分片 0
例 2 中从开启事务到执行 UPDATE tb1 SET a 100 WHERE id 1 走的流程和例 1 一样直到执行 UPDATE tb1 SET a 100 WHERE id 0 时才有所不同。具体而言事务在之前就获取了分片 0 的只读连接当执行到 TransactionConnectionHolder#getConnection 时会先提交只读连接上的事务实际执行了 ROLLBACK然后在这条连接上用 XA START 开启 XA 事务设置好时间戳就可以把这个物理连接返回给执行器最终执行物理 SQL。由于这是第二个写连接因此还会设置事务为跨分片事务以触发正常的两阶段提交流程。
COMMIT
例 2 的重点在于写了 2 个分片因此 COMMIT 时会调用 commitMultiShardTrx() 走多分片的分布式提交流程。方法 commitMultiShardTrx 代码如下
// TsoTransaction#commitMultiShardTrx()
protected void commitMultiShardTrx() {// 事务日志的提交状态一共有 3 种FAILUREUNKNODWNSUCCESS。TransactionCommitState commitState TransactionCommitState.FAILURE;try {// 对所有连接执行 XA prepare。prepareConnections();// 拿 commit 时间戳。commitTimestamp nextTimestamp();Connection logConn /* 获取主分片上的连接 */;// 所有分支事务 prepare 成功在写事务日志前状态设置为 UNKNOWN其作用见后续代码说明。commitState TransactionCommitState.UNKNOWN;// 写事务日志。writeCommitLog(logConn);// 写事务日志成功状态置为 SUCCESS。commitState TransactionCommitState.SUCCESS;} catch (RuntimeException ex) {exception ex;}if (commitState TransactionCommitState.FAILURE) {// 写事务日志前失败了回滚所有连接。rollbackConnections();} else if (commitState TransactionCommitState.SUCCESS) {// 写事务日志成功了提交所有连接。commitConnections();} else {// 所有连接都 prepare 成功了但无法确定是否写入了事务日志// 将由事务恢复的逻辑来决定是 COMMIT 还是 ROLLBACK。这里先丢弃所有连接。discardConnections();}// 释放并清空这些物理连接。connectionHolder.closeAllConnections();// prepare 或 commit 阶段抛出的异常这里重新抛出。if (exception ! null) {throw exception;}
}
上述代码正是两阶段提交的流程。
在 prepare 阶段调用 prepareConnections()其中对只读连接只是简单执行 ROLLBACK 语句对于写连接执行 XA END {xid} 和 XA PREPARE {xid} 语句。
如果所有连接都 prepare 成功了在 commit 阶段调用 writeCommitLog(logConn)用主分片在我们的例子中是分片 1的连接写下事务日志主要包括了事务的 id 和 commit 时间戳事务日志主要用于后续的事务恢复。
如果事务日志写成功了就意味着 commit 成功了会调用 commitConnections() 对所有连接进行提交这一步只用对写连接设置 commit 时间戳 SET innodb_commit_seq {commitTimestamp} 和执行 XA COMMIT {xid} 语句。
如果在写事务日志前失败了会调用 rollbackConnections() 对所有连接进行回滚主要会对写连接执行 XA ROLLBACK {xid} 回滚。
如果无法确定是否成功写入了事务日志事务的状态会是 UNKNOWN。此时我们能确定所有分支事务都成功 prepare 了如果事务日志写入成功了则要进行 XA COMMIT如果事务日志没有写入则要进行 XA ROLLBACK。由于不确定是要提交还是回滚我们会丢弃所有物理连接使这些连接后续不再可用。至于事务最终是提交还是回滚则交给事务恢复线程来处理接下来我们会解读事务恢复相关的代码。
事务恢复
一个分布式事务在提交的时候可能遇到各种情况导致提交失败。例如所有分支事务都 prepare 成功了事务日志也写入成功了但 XA COMMIT 失败了。对于这种情况事务恢复时需要正确提交所有分支事务。另一种情况是部分分支事务 prepare 成功了另外一些失败了或事务日志没有写入成功。对于这种情况事务恢复需要回滚掉已 prepare 的分支事务。
事务恢复主要由 XARecoverTask 负责。其主体代码为 recoverInstance 方法该方法检查一个 DN 下的所有 prepare 过的事务并根据事务状态提交或回滚这些事务代码如下
// XARecoverTask#recoverInstance(IDataSource, SetString)
// dataSource 用户获取 DN 的连接groups 是当前逻辑库的所有物理分片
private void recoverInstance(IDataSource dataSource, SetString groups) {// 执行 XA RECOVER 获取该 DN 上所有 prepare 的事务。// 此处省略了从 dataSource 获取连接和生成 statement 的代码。ResultSet rs stmt.executeQuery(XA RECOVER);while (rs.next()) {// 对每一条记录生成对象 PreparedXATrans// 主要包括 xid事务id分支事务所在的分片主分片等信息。PreparedXATrans trans /* 从 rs 中获取一行数据生成 */;if (/* trans 所在分片是当前逻辑库的一个物理分片 trans 在上一次 recover 任务时也出现过即较长时间都未被提交或回滚*/) {// 介入处理这个分支事务回滚或提交。rollBackOrForward(trans, stmt);}}
}
该任务每 5 秒到每个 DN 上执行一次 XA RECOVER得到所有 prepare 过的事务如果看到一个事务在上一次任务中也出现过即至少过去 5 秒都没有提交或回滚则会选择对这个事务进行回滚或提交。相关逻辑代码为 rollBackOrForward代码如下
// XARecoverTask#rollBackOrForward(PreparedXATrans, Statement)
private boolean rollBackOrForward(PreparedXATrans trans, Statement stmt) throws SQLException {String primaryGroup /* 从 trans 里解析出主分片详见前文关于 xid 的生成 */;// 尝试从主分片的事务日志中找到相关的事务日志。GlobalTxLog tx GlobalTxLogManager.get(primaryGroup, trans.transId);if (tx ! null) {// 确实存在事务日志根据事务日志状态判断回滚或提交。if (tx.getState() TransactionState.ABORTED) {// ABORTED 状态的事务需要回滚。return tryRollback(stmt, trans);} else {// SUCCESS 状态的事务需要提交。return tryCommitTSO(stmt, trans, tx.getCommitTimestamp());}} else {// 没有找到事务日志尝试回滚先开启事务写下事务日志标记事务为 ABORTED。try (Connection conn2 /* 获取主分片上的另一个连接 */;) {conn2.setAutocommit(false);// 尝试回滚如果因为别的线程正在处理这个事务//比如该事务只是提交得慢连接仍未断开还在提交流程// 而报错就回滚上一条写事务日志的语句。txLog.append(transInfo.transId, TransactionType.XA, TransactionState.ABORTED, new ConnectionContext(), conn2);stmt.execute(XA ROLLBACK trans.toXid());conn2.commit();return true;} catch (Exception e) {/* 根据异常判断是否要回滚写事务日志的操作 */}}
}
该方法主要有 3 段逻辑。
一是如果存在对应的事务日志且事务状态是 ABORTED那就执行 tryRollback 方法回滚该方法主要执行了 XA ROLLBACK {xid}。
二是如果事务状态是 SUCCESS那就执行 tryCommitTSO 方法回滚该方法会设置 commit 时间戳然后执行 XA COMMIT {xid}。
三是如果没找到事务日志此时一般有两种可能1事务提交失败了且没写下事务日志此时需要回滚2事务还在两阶段提交的流程中只是 prepare 较慢还没开始写事务日志此时不需要做任何操作。在 MySQL 中如果发起 XA START 的连接没有关闭其他连接是无法通过 XA ROLLBACK 或 XA COMMIT 来回滚或提交这个分支事务的。我们利用这一特性首先在事务日志插入一条 ABORTED 记录表明这个分布式事务需要回滚然后尝试执行 XA ROLLBACK 回滚这个分支事务。如果遇到 2的情况则会收到特定的报错此时再回滚掉插入 ABORTED 事务日志的操作。在我们插入 ABORTED 事务日志后原本正在提交事务的线程在插入 SUCCESS 事务日志时会被阻塞而在我们回滚掉插入 ABORTED 事务日志的操作后事务提交流程就会继续进行下去。
小结
本文主要解读了 PolarDB-X 中 CN 端的事务相关的代码以 TSO 事务为主使用两个例子一步步地展示了事务的开启、执行、提交、恢复等流程。希望大家阅读本文后能更加了解 PolarDB-X 的事务系统。
原文链接
本文为阿里云原创内容未经允许不得转载。