百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 热门文章 > 正文

Seata源码—6.Seata AT模式的数据源代理二

bigegpt 2025-06-18 19:16 2 浏览

大纲

1.Seata的Resource资源接口源码

2.Seata数据源连接池代理的实现源码

3.Client向Server发起注册RM的源码

4.Client向Server注册RM时的交互源码

5.数据源连接代理与SQL句柄代理的初始化源码

6.Seata基于SQL句柄代理执行SQL的源码

7.执行SQL语句前取消自动提交事务的源码

8.执行SQL语句前后构建数据镜像的源码

9.构建全局锁的key和UndoLog数据的源码

10.Seata Client发起分支事务注册的源码

11.Seata Server处理分支事务注册请求的源码

12.将UndoLog写入到数据库与提交事务的源码

13.通过全局锁重试策略组件执行事务的提交

14.注册分支事务时获取全局锁的入口源码

15.Seata Server获取全局锁的具体逻辑源码

16.全局锁和分支事务及本地事务总结

17.提交全局事务以及提交各分支事务的源码

18.全局事务回滚的过程源码


11.Seata Server处理分支事务注册请求的源码

(1)Seata Server收到分支事务注册请求后的处理

(2)
BranchRegisterRequest.handle()的处理

(3)
DefaultCore.branchRegister()的处理


(1)Seata Server收到分支事务注册请求后的处理

Seata Server收到Seata Client发送过来的分支事务注册请求后,首先会将分支事务注册请求交给ServerOnRequestProcessor的process()方法进行处理,然后再将请求交给DefaultCoordinator的onRequest()方法进行处理。

public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {
    ...
    @ChannelHandler.Sharable
    class ServerHandler extends ChannelDuplexHandler {
        @Override
        public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
            if (!(msg instanceof RpcMessage)) {
                return;
            }
            //接下来调用processMessage()方法对解码完毕的RpcMessage对象进行处理
            processMessage(ctx, (RpcMessage) msg);
        }
    }
}

public abstract class AbstractNettyRemoting implements Disposable {
    ...
    protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
        }
        Object body = rpcMessage.getBody();
        if (body instanceof MessageTypeAware) {
            MessageTypeAware messageTypeAware = (MessageTypeAware) body;
            //根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的
            //processorTable里的内容,是NettyRemotingServer在初始化时,通过调用registerProcessor()方法put进去的
            //所以下面的代码实际上会由ServerOnRequestProcessor的process()方法进行处理
            final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
            if (pair != null) {
                if (pair.getSecond() != null) {
                    try {
                        pair.getSecond().execute(() -> {
                            try {
                                pair.getFirst().process(ctx, rpcMessage);
                            } catch (Throwable th) {
                                LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                            } finally {
                                MDC.clear();
                            }
                        });
                    } catch (RejectedExecutionException e) {
                        ...
                    }
                } else {
                    try {
                        pair.getFirst().process(ctx, rpcMessage);
                    } catch (Throwable th) {
                        LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                    }
                }
            } else {
                LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
            }
        } else {
            LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
        }
    }
    ...
}

public class ServerOnRequestProcessor implements RemotingProcessor, Disposable {
    private final RemotingServer remotingServer;
    ...
    @Override
    public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
        if (ChannelManager.isRegistered(ctx.channel())) {
            onRequestMessage(ctx, rpcMessage);
        } else {
            try {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel());
                }
                ctx.disconnect();
                ctx.close();
            } catch (Exception exx) {
                LOGGER.error(exx.getMessage());
            }
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));
            }
        }
    }
    
    private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
        Object message = rpcMessage.getBody();
        //RpcContext线程本地变量副本
        RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("server received:{},clientIp:{},vgroup:{}", message, NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
        } else {
            try {
                BatchLogHandler.INSTANCE.getLogQueue().put(message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:" + rpcContext.getTransactionServiceGroup());
            } catch (InterruptedException e) {
                LOGGER.error("put message to logQueue error: {}", e.getMessage(), e);
            }
        }
        if (!(message instanceof AbstractMessage)) {
            return;
        }
        //the batch send request message
        if (message instanceof MergedWarpMessage) {
            ...
        } else {
            //the single send request message
            final AbstractMessage msg = (AbstractMessage) message;
            //最终调用到DefaultCoordinator的onRequest()方法来处理RpcMessage
            //此时传入的msg其实就是客户端发送请求时的BranchRegisterRequest对象
            AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);
            //返回响应给客户端
            remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);
        }
    }
    ...
}

public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {
    ...
    @Override
    public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
        if (!(request instanceof AbstractTransactionRequestToTC)) {
            throw new IllegalArgumentException();
        }
        //此时传入的request其实就是客户端发送请求时的BranchRegisterRequest对象
        AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;
        transactionRequest.setTCInboundHandler(this);
        return transactionRequest.handle(context);
    }
    ...
}

(2)
BranchRegisterRequest.handle()的处理

在DefaultCoordinator的onRequest()方法中,会调用BranchRegisterRequest的handle()方法来处理分支事务注册请求,该handle()方法又会调用DefaultCoordinator的doBranchRegister()方法,所以最后会调用DefaultCore的branchRegister()方法来具体处理分支事务注册请求。

public class BranchRegisterRequest extends AbstractTransactionRequestToTC  {
    ...
    @Override
    public AbstractTransactionResponse handle(RpcContext rpcContext) {
        return handler.handle(this, rpcContext);
    }
    ...
}

public interface TCInboundHandler {
    ...
    //Handle branch register response.
    BranchRegisterResponse handle(BranchRegisterRequest branchRegister, RpcContext rpcContext);
}

public abstract class AbstractTCInboundHandler extends AbstractExceptionHandler implements TCInboundHandler {
    ...
    @Override
    public BranchRegisterResponse handle(BranchRegisterRequest request, final RpcContext rpcContext) {
        BranchRegisterResponse response = new BranchRegisterResponse();
        exceptionHandleTemplate(new AbstractCallback<BranchRegisterRequest, BranchRegisterResponse>() {
            @Override
            public void execute(BranchRegisterRequest request, BranchRegisterResponse response) throws TransactionException {
                try {
                    doBranchRegister(request, response, rpcContext);
                } catch (StoreException e) {
                    throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("branch register request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e);
                }
            }
        }, request, response);
        return response;
    }
    
    //Do branch register.
    protected abstract void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response, RpcContext rpcContext) throws TransactionException;
    ...
}

public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {
    private final DefaultCore core;
    ...
    @Override
    protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response, RpcContext rpcContext) throws TransactionException {
        MDC.put(RootContext.MDC_KEY_XID, request.getXid());
        //调用DefaultCore的branchRegister()方法处理分支事务注册请求
        response.setBranchId(core.branchRegister(request.getBranchType(), request.getResourceId(), rpcContext.getClientId(), request.getXid(), request.getApplicationData(), request.getLockKey()));
    }
    ...
}

(3)
DefaultCore.branchRegister()的处理

DefaultCore的branchRegister()方法其实会继续调用其抽象父类AbstractCore的branchRegister()方法来处理注册分支事务请求,具体的过程如下:


一.根据xid获取全局事务会话

二.根据全局事务会话创建分支事务会话

三.通过MDC将分支事务ID存到线程本地变量副本

四.注册分支事务需要先获取全局锁

五.把分支事务会话加入到全局事务会话中并持久化

public class DefaultCore implements Core {
    private static Map<BranchType, AbstractCore> coreMap = new ConcurrentHashMap<>();
    
    public DefaultCore(RemotingServer remotingServer) {
        List<AbstractCore> allCore = EnhancedServiceLoader.loadAll(AbstractCore.class, new Class[] {RemotingServer.class}, new Object[] {remotingServer});
        if (CollectionUtils.isNotEmpty(allCore)) {
            for (AbstractCore core : allCore) {
                coreMap.put(core.getHandleBranchType(), core);
            }
        }
    }
    
    @Override
    public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
        return getCore(branchType).branchRegister(branchType, resourceId, clientId, xid, applicationData, lockKeys);
    }
    
    public AbstractCore getCore(BranchType branchType) {
        AbstractCore core = coreMap.get(branchType);
        if (core == null) {
            throw new NotSupportYetException("unsupported type:" + branchType.name());
        }
        return core;
    }
    ...
}

public abstract class AbstractCore implements Core {
    protected RemotingServer remotingServer;
    
    public AbstractCore(RemotingServer remotingServer) {
        if (remotingServer == null) {
            throw new IllegalArgumentException("remotingServer must be not null");
        }
        this.remotingServer = remotingServer;
    }
    
    //注册分支事务
    @Override
    public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
        //1.根据xid获取全局事务会话GlobalSession
        GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
        return SessionHolder.lockAndExecute(globalSession, () -> {
            globalSessionStatusCheck(globalSession);
            globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());

            //2.创建分支事务会话BranchSession,根据全局事务开启一个分支事务
            //传入的参数依次是:全局事务会话、事务类型、资源ID、应用数据、全局锁keys、客户端ID
            BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId, applicationData, lockKeys, clientId);

            //3.把分支事务的ID存放到线程本地变量副本中,也就是MDC中
            MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));

            //4.注册分支事务时会获取全局锁
            //分支事务会话branchSession尝试获取一个全局锁,获取失败会抛异常,说明分支事务注册失败
            branchSessionLock(globalSession, branchSession);

            try {
                //5.把分支事务会话加入到全局事务会话中
                globalSession.addBranch(branchSession);
            } catch (RuntimeException ex) {
                branchSessionUnlock(branchSession);
                throw new BranchTransactionException(FailedToAddBranch, String.format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(), branchSession.getBranchId()), ex);
            }
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}", globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys);
            }
            return branchSession.getBranchId();
        });
    }
    
    private GlobalSession assertGlobalSessionNotNull(String xid, boolean withBranchSessions) throws TransactionException {
        //根据xid寻找全局事务会话GlobalSession
        GlobalSession globalSession = SessionHolder.findGlobalSession(xid, withBranchSessions);
        if (globalSession == null) {
            throw new GlobalTransactionException(TransactionExceptionCode.GlobalTransactionNotExist, String.format("Could not found global transaction xid = %s, may be has finished.", xid));
        }
        return globalSession;
    }
    
    //获取全局锁,获取全局锁失败则抛异常
    protected void branchSessionLock(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {
    }
    ...
}

public class SessionHolder {
    ...
    //根据xid获取全局事务会话GlobalSession
    public static GlobalSession findGlobalSession(String xid, boolean withBranchSessions) {
        return getRootSessionManager().findGlobalSession(xid, withBranchSessions);
    }
    ...
}

@LoadLevel(name = "db", scope = Scope.PROTOTYPE)
public class DataBaseSessionManager extends AbstractSessionManager implements Initialize {
    ...
    //根据xid获取全局事务会话GlobalSession
    @Override
    public GlobalSession findGlobalSession(String xid, boolean withBranchSessions) {
        return transactionStoreManager.readSession(xid, withBranchSessions);
    }
    ...
}

public class DataBaseTransactionStoreManager extends AbstractTransactionStoreManager implements TransactionStoreManager {
    ...
    //根据xid获取全局事务会话GlobalSession
    @Override
    public GlobalSession readSession(String xid, boolean withBranchSessions) {
        //global transaction
        GlobalTransactionDO globalTransactionDO = logStore.queryGlobalTransactionDO(xid);
        if (globalTransactionDO == null) {
            return null;
        }
        //branch transactions
        List<BranchTransactionDO> branchTransactionDOs = null;
        //reduce rpc with db when branchRegister and getGlobalStatus
        if (withBranchSessions) {
            branchTransactionDOs = logStore.queryBranchTransactionDO(globalTransactionDO.getXid());
        }
        return getGlobalSession(globalTransactionDO, branchTransactionDOs);
    }
    ...
}

public class SessionHelper {
    ...
    //创建分支事务会话
    public static BranchSession newBranchByGlobal(GlobalSession globalSession, BranchType branchType, String resourceId, String applicationData, String lockKeys, String clientId) {
        BranchSession branchSession = new BranchSession();
        branchSession.setXid(globalSession.getXid());
        branchSession.setTransactionId(globalSession.getTransactionId());
        branchSession.setBranchId(UUIDGenerator.generateUUID());
        branchSession.setBranchType(branchType);
        branchSession.setResourceId(resourceId);
        branchSession.setLockKey(lockKeys);
        branchSession.setClientId(clientId);
        branchSession.setApplicationData(applicationData);
        return branchSession;
    }
    ...
}

public class GlobalSession implements SessionLifecycle, SessionStorable {
    private List<BranchSession> branchSessions;
    ...
    //把分支事务会话加入到全局事务会话中
    @Override
    public void addBranch(BranchSession branchSession) throws TransactionException {
        for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
            lifecycleListener.onAddBranch(this, branchSession);
        }
        branchSession.setStatus(BranchStatus.Registered);
        add(branchSession);
    }
    
    //把分支事务会话加入到全局事务会话中
    public boolean add(BranchSession branchSession) {
        if (null != branchSessions) {
            return branchSessions.add(branchSession);
        } else {
            //db and redis no need to deal with
            return true;
        }
    }
    ...
}

public abstract class AbstractSessionManager implements SessionManager, SessionLifecycleListener {
    ...
    @Override
    public void onAddBranch(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {
        addBranchSession(globalSession, branchSession);
    }
    
    @Override
    public void addBranchSession(GlobalSession session, BranchSession branchSession) throws TransactionException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("MANAGER[{}] SESSION[{}] {}", name, branchSession, LogOperation.BRANCH_ADD);
        }
        writeSession(LogOperation.BRANCH_ADD, branchSession);
    }
    
    //持久化全局事务会话
    private void writeSession(LogOperation logOperation, SessionStorable sessionStorable) throws TransactionException {
        //transactionStoreManager.writeSession()会对全局事务会话进行持久化
        if (!transactionStoreManager.writeSession(logOperation, sessionStorable)) {
            ...
        }
    }
    ...
}


12.将UndoLog写入到数据库与提交事务的源码

在数据源连接代理ConnectionProxy的
processGlobalTransactionCommit()方法中:

一.首先会注册完分支事务

二.然后会将UndoLog写入到数据库

三.最后才提交目标数据源连接的事务

//数据源连接代理
public class ConnectionProxy extends AbstractConnectionProxy {
    private final LockRetryPolicy lockRetryPolicy = new LockRetryPolicy(this);
    ...
    @Override
    public void commit() throws SQLException {
        try {
            //通过全局锁重试策略组件LockRetryPolicy来执行本地事务的提交
            lockRetryPolicy.execute(() -> {
                doCommit();
                return null;
            });
        } catch (SQLException e) {
            if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
                rollback();
            }
            throw e;
        } catch (Exception e) {
            throw new SQLException(e);
        }
    }
    
    private void doCommit() throws SQLException {
        if (context.inGlobalTransaction()) {
            processGlobalTransactionCommit();
        } else if (context.isGlobalLockRequire()) {
            processLocalCommitWithGlobalLocks();
        } else {
            targetConnection.commit();
        }
    }
    
    private void processLocalCommitWithGlobalLocks() throws SQLException {
        //检查全局锁keys
        checkLock(context.buildLockKeys());
        try {
            //目标数据源连接提交事务
            targetConnection.commit();
        } catch (Throwable ex) {
            throw new SQLException(ex);
        }
        context.reset();
    }
    
    private void processGlobalTransactionCommit() throws SQLException {
        try {
            //1.注册分支事务
            register();
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e, context.buildLockKeys());
        }
        try {
            //2.将UndoLog写入到数据库
            UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
            //3.目标数据源连接提交事务
            targetConnection.commit();
        } catch (Throwable ex) {
            LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
            report(false);
            throw new SQLException(ex);
        }
        if (IS_REPORT_SUCCESS_ENABLE) {
            report(true);
        }
        context.reset();
    }
    ...
}

public class UndoLogManagerFactory {
    private static final Map<String, UndoLogManager> UNDO_LOG_MANAGER_MAP = new ConcurrentHashMap<>();
    //获取UndoLog管理器
    public static UndoLogManager getUndoLogManager(String dbType) {
        return CollectionUtils.computeIfAbsent(UNDO_LOG_MANAGER_MAP, dbType,
            key -> EnhancedServiceLoader.load(UndoLogManager.class, dbType));
    }
}

public abstract class AbstractUndoLogManager implements UndoLogManager {
    ...
    @Override
    public void flushUndoLogs(ConnectionProxy cp) throws SQLException {
        ConnectionContext connectionContext = cp.getContext();
        if (!connectionContext.hasUndoLog()) {
            return;
        }

        String xid = connectionContext.getXid();
        long branchId = connectionContext.getBranchId();

        BranchUndoLog branchUndoLog = new BranchUndoLog();
        branchUndoLog.setXid(xid);
        branchUndoLog.setBranchId(branchId);
        branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());

        UndoLogParser parser = UndoLogParserFactory.getInstance();
        byte[] undoLogContent = parser.encode(branchUndoLog);

        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Flushing UNDO LOG: {}", new String(undoLogContent, Constants.DEFAULT_CHARSET));
        }

        CompressorType compressorType = CompressorType.NONE;
        if (needCompress(undoLogContent)) {
            compressorType = ROLLBACK_INFO_COMPRESS_TYPE;
            undoLogContent = CompressorFactory.getCompressor(compressorType.getCode()).compress(undoLogContent);
        }

        //插入UndoLog到数据库
        insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName(), compressorType), undoLogContent, cp.getTargetConnection());
    }
    
    //insert uodo log when normal
    protected abstract void insertUndoLogWithNormal(String xid, long branchId, String rollbackCtx, byte[] undoLogContent, Connection conn) throws SQLException;
    ...
}

@LoadLevel(name = JdbcConstants.MYSQL)
public class MySQLUndoLogManager extends AbstractUndoLogManager {
    ...
    @Override
    protected void insertUndoLogWithNormal(String xid, long branchId, String rollbackCtx, byte[] undoLogContent, Connection conn) throws SQLException {
        insertUndoLog(xid, branchId, rollbackCtx, undoLogContent, State.Normal, conn);
    }
    
    private void insertUndoLog(String xid, long branchId, String rollbackCtx, byte[] undoLogContent, State state, Connection conn) throws SQLException {
        try (PreparedStatement pst = conn.prepareStatement(INSERT_UNDO_LOG_SQL)) {
            pst.setLong(1, branchId);
            pst.setString(2, xid);
            pst.setString(3, rollbackCtx);
            pst.setBytes(4, undoLogContent);
            pst.setInt(5, state.getValue());
            pst.executeUpdate();
        } catch (Exception e) {
            if (!(e instanceof SQLException)) {
                e = new SQLException(e);
            }
            throw (SQLException) e;
        }
    }
    ...
}


13.通过全局锁重试策略组件执行事务的提交

当设置完禁止自动提交事务、构建前镜像、执行SQL、构建后镜像,执行到数据源连接代理ConnectionProxy的commit()方法提交本地事务时,便会通过全局锁重试策略LockRetryPolicy来执行本地事务的提交。


全局锁重试策略LockRetryPolicy,会确保先获取到全局锁才提交本地事务。也就是如果获取不到全局锁,则重试获取。此外,注册分支事务时,获取到全局锁才能注册成功

public class ConnectionProxy extends AbstractConnectionProxy {
    private final LockRetryPolicy lockRetryPolicy = new LockRetryPolicy(this);
    ...
    @Override
    public void commit() throws SQLException {
        try {
            //通过全局锁重试策略组件LockRetryPolicy来执行本地事务的提交
            lockRetryPolicy.execute(() -> {
                doCommit();
                return null;
            });
        } catch (SQLException e) {
            if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
                rollback();
            }
            throw e;
        } catch (Exception e) {
            throw new SQLException(e);
        }
    }
    ...
    public static class LockRetryPolicy {
        protected static final boolean LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT = ConfigurationFactory.getInstance().
            getBoolean(ConfigurationKeys.CLIENT_LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT, DEFAULT_CLIENT_LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT);
        protected final ConnectionProxy connection;
        public LockRetryPolicy(ConnectionProxy connection) {
            this.connection = connection;
        }
        public <T> T execute(Callable<T> callable) throws Exception {
            //the only case that not need to retry acquire lock hear is
            //LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT == true && connection#autoCommit == true
            //because it has retry acquire lock when AbstractDMLBaseExecutor#executeAutoCommitTrue
            if (LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT && connection.getContext().isAutoCommitChanged()) {
                //不需要重试
                return callable.call();
            } else {
                //LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT == false
                //or LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT == true && autoCommit == false
                return doRetryOnLockConflict(callable);
            }
        }

        protected <T> T doRetryOnLockConflict(Callable<T> callable) throws Exception {
            LockRetryController lockRetryController = new LockRetryController();
            while (true) {
                try {
                    return callable.call();
                } catch (LockConflictException lockConflict) {
                    onException(lockConflict);
                    //AbstractDMLBaseExecutor#executeAutoCommitTrue the local lock is released
                    if (connection.getContext().isAutoCommitChanged() && lockConflict.getCode() == TransactionExceptionCode.LockKeyConflictFailFast) {
                        lockConflict.setCode(TransactionExceptionCode.LockKeyConflict);
                    }
                    //休眠一会再去重试
                    lockRetryController.sleep(lockConflict);
                } catch (Exception e) {
                    onException(e);
                    throw e;
                }
            }
        }
     
        //Callback on exception in doLockRetryOnConflict.
        protected void onException(Exception e) throws Exception {
        }
    }
    ...
}

public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {
    ...
    private static class LockRetryPolicy extends ConnectionProxy.LockRetryPolicy {
        LockRetryPolicy(final ConnectionProxy connection) {
            super(connection);
        }

        @Override
        public <T> T execute(Callable<T> callable) throws Exception {
            if (LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT) {
                return doRetryOnLockConflict(callable);
            } else {
                return callable.call();
            }
        }

        @Override
        protected void onException(Exception e) throws Exception {
            ConnectionContext context = connection.getContext();
            //UndoItems can't use the Set collection class to prevent ABA
            context.removeSavepoint(null);
            //回滚目标数据源连接对SQL的执行
            connection.getTargetConnection().rollback();
        }

        public static boolean isLockRetryPolicyBranchRollbackOnConflict() {
            return LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT;
        }
    }
    ...
}


14.注册分支事务时获取全局锁的入口源码

在Seata Server中,只有当全局锁获取成功后,分支事务才能注册成功。AbstractCore的branchRegister()方法会通过调用ATCore的branchSessionLock()方法来获取全局锁,而ATCore的branchSessionLock()方法最终则是靠调用AbstractLockManager的acquireLock()方法来尝试获取全局锁的。获取全局锁失败会抛出异常,说明注册分支事务失败。

public abstract class AbstractCore implements Core {
    ...
    //注册分支事务
    @Override
    public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
        //1.根据xid获取全局事务会话GlobalSession
        GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
        return SessionHolder.lockAndExecute(globalSession, () -> {
            globalSessionStatusCheck(globalSession);
            globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());

            //2.创建分支事务会话,根据全局事务开启一个分支事务
            //传入的参数依次是:全局事务会话、事务类型、资源ID、应用数据、全局锁keys、客户端ID
            BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId, applicationData, lockKeys, clientId);

            //3.把分支事务的ID存放到线程本地变量副本中,也就是MDC中
            MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));

            //4.注册分支事务时会加全局锁
            //分支事务会话branchSession尝试获取一个全局锁,获取失败会抛异常,说明分支事务注册失败
            branchSessionLock(globalSession, branchSession);

            try {
                //5.把分支事务会话加入到全局事务会话中
                globalSession.addBranch(branchSession);
            } catch (RuntimeException ex) {
                branchSessionUnlock(branchSession);
                throw new BranchTransactionException(FailedToAddBranch, String.format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(), branchSession.getBranchId()), ex);
            }
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}", globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys);
            }
            return branchSession.getBranchId();
        });
    }
    ...
}

public class ATCore extends AbstractCore {
    ...
    @Override
    protected void branchSessionLock(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {
        //从应用程序数据里提取出一些属性进行属性赋值
        String applicationData = branchSession.getApplicationData();
        boolean autoCommit = true;
        boolean skipCheckLock = false;

        if (StringUtils.isNotBlank(applicationData)) {
            if (objectMapper == null) {
                objectMapper = new ObjectMapper();
            }
            try {
                //ObjectMapper是一个对象映射框架,它可以把ApplicationData对象里的属性值读取出来,然后写入到HashMap里
                Map<String, Object> data = objectMapper.readValue(applicationData, HashMap.class);
                Object clientAutoCommit = data.get(AUTO_COMMIT);
                if (clientAutoCommit != null && !(boolean)clientAutoCommit) {
                    autoCommit = (boolean)clientAutoCommit;
                }
                Object clientSkipCheckLock = data.get(SKIP_CHECK_LOCK);
                if (clientSkipCheckLock instanceof Boolean) {
                    skipCheckLock = (boolean)clientSkipCheckLock;
                }
            } catch (IOException e) {
                LOGGER.error("failed to get application data: {}", e.getMessage(), e);
            }
        }

        try {
            //分支事务会话branchSession尝试获取一个全局锁,获取失败会抛异常,说明分支事务注册失败
            if (!branchSession.lock(autoCommit, skipCheckLock)) {
                throw new BranchTransactionException(LockKeyConflict, String.format("Global lock acquire failed xid = %s branchId = %s", globalSession.getXid(), branchSession.getBranchId()));
            }
        } catch (StoreException e) {
            if (e.getCause() instanceof BranchTransactionException) {
                throw new BranchTransactionException(((BranchTransactionException)e.getCause()).getCode(), String.format("Global lock acquire failed xid = %s branchId = %s", globalSession.getXid(), branchSession.getBranchId()));
            }
            throw e;
        }
    }
    ...
}

public class BranchSession implements Lockable, Comparable<BranchSession>, SessionStorable {
    ...
    public boolean lock(boolean autoCommit, boolean skipCheckLock) throws TransactionException {
        if (this.getBranchType().equals(BranchType.AT)) {
            //尝试获取全局锁
            return LockerManagerFactory.getLockManager().acquireLock(this, autoCommit, skipCheckLock);
        }
        return true;
    }
    ...
}

public class LockerManagerFactory {
    private static final Configuration CONFIG = ConfigurationFactory.getInstance();
    private static volatile LockManager LOCK_MANAGER;
    
    public static LockManager getLockManager() {
        if (LOCK_MANAGER == null) {
            init();
        }
        return LOCK_MANAGER;
    }
    
    public static void init() {
        init(null);
    }
    
    public static void init(String lockMode) {
        if (LOCK_MANAGER == null) {
            synchronized (LockerManagerFactory.class) {
                if (LOCK_MANAGER == null) {
                    if (StringUtils.isBlank(lockMode)) {
                        lockMode = CONFIG.getConfig(ConfigurationKeys.STORE_LOCK_MODE, CONFIG.getConfig(ConfigurationKeys.STORE_MODE, SERVER_DEFAULT_STORE_MODE));
                    }
                    if (StoreMode.contains(lockMode)) {
                        LOCK_MANAGER = EnhancedServiceLoader.load(LockManager.class, lockMode);
                    }
                }
            }
        }
    }
}

public abstract class AbstractLockManager implements LockManager {
    ...
    @Override
    public boolean acquireLock(BranchSession branchSession, boolean autoCommit, boolean skipCheckLock) throws TransactionException {
        if (branchSession == null) {
            throw new IllegalArgumentException("branchSession can't be null for memory/file locker.");
        }
        String lockKey = branchSession.getLockKey();
        if (StringUtils.isNullOrEmpty(lockKey)) {
            //no lock
            return true;
        }
        //get locks of branch
        //获取到分支事务里需要的所有行锁
        List<RowLock> locks = collectRowLocks(branchSession);
        if (CollectionUtils.isEmpty(locks)) {
            //no lock
            return true;
        }
        //具体进行获取锁
        return getLocker(branchSession).acquireLock(locks, autoCommit, skipCheckLock);
    }
    
    @Override
    public List<RowLock> collectRowLocks(BranchSession branchSession) {
        if (branchSession == null || StringUtils.isBlank(branchSession.getLockKey())) {
            return Collections.emptyList();
        }

        String lockKey = branchSession.getLockKey();
        String resourceId = branchSession.getResourceId();
        String xid = branchSession.getXid();
        long transactionId = branchSession.getTransactionId();
        long branchId = branchSession.getBranchId();

        return collectRowLocks(lockKey, resourceId, xid, transactionId, branchId);
    }
    
    protected List<RowLock> collectRowLocks(String lockKey, String resourceId, String xid, Long transactionId, Long branchID) {
        List<RowLock> locks = new ArrayList<>();
        String[] tableGroupedLockKeys = lockKey.split(";");
        for (String tableGroupedLockKey : tableGroupedLockKeys) {
            int idx = tableGroupedLockKey.indexOf(":");
            if (idx < 0) {
                return locks;
            }
            String tableName = tableGroupedLockKey.substring(0, idx);
            String mergedPKs = tableGroupedLockKey.substring(idx + 1);
            if (StringUtils.isBlank(mergedPKs)) {
                return locks;
            }
            String[] pks = mergedPKs.split(",");
            if (pks == null || pks.length == 0) {
                return locks;
            }
            for (String pk : pks) {
                if (StringUtils.isNotBlank(pk)) {
                    RowLock rowLock = new RowLock();
                    rowLock.setXid(xid);
                    rowLock.setTransactionId(transactionId);
                    rowLock.setBranchId(branchID);
                    rowLock.setTableName(tableName);
                    rowLock.setPk(pk);
                    rowLock.setResourceId(resourceId);
                    locks.add(rowLock);
                }
            }
        }
        return locks;
    }
    ...
}

public class RowLock {
    private String xid;//全局事务xid
    private Long transactionId;//全局事务ID    
    private Long branchId;//分支事务ID    
    private String resourceId;//资源ID
    private String tableName;//表名称
    private String pk;//主键    
    private String rowKey;//行键
    private String feature;//功能特性
    ...
}


15.Seata Server获取全局锁的具体逻辑源码

调用AbstractLockManager的acquireLock()方法获取全局锁时,其实调用的是DataBaseLocker的acquireLock()方法 -> LockStoreDataBaseDAO的acquireLock()方法。


LockStoreDataBaseDAO的acquireLock()方法中,首先会查询数据库中是否存在要申请的全局锁的记录,然后根据这些锁记录 + xid判断是否由当前全局事务获取的(这是核心)。


如果不是,则说明其他全局事务先获取到了要申请的全局锁,此时当前事务获取全局锁失败。


如果是,则把当前事务已经获取过的全局锁过滤出来,然后尝试写入当前分支事务还需获取的全局锁记录到数据库。如果写入成功,则表示当前分支事务成功获取到全局锁。如果写入失败,则表示其他分支事务已经获取到全局锁

@LoadLevel(name = "db")
public class DataBaseLockManager extends AbstractLockManager implements Initialize {
    private Locker locker;
    
    @Override
    public void init() {
        //init dataSource
        String datasourceType = ConfigurationFactory.getInstance().getConfig(ConfigurationKeys.STORE_DB_DATASOURCE_TYPE);
        DataSource lockStoreDataSource = EnhancedServiceLoader.load(DataSourceProvider.class, datasourceType).provide();
        locker = new DataBaseLocker(lockStoreDataSource);
    }

    @Override
    public Locker getLocker(BranchSession branchSession) {
        return locker;
    }
    ...
}

public class DataBaseLocker extends AbstractLocker {
    private LockStore lockStore;
    
    public DataBaseLocker(DataSource logStoreDataSource) {
        lockStore = new LockStoreDataBaseDAO(logStoreDataSource);
    }
    ...
    
    @Override
    public boolean acquireLock(List<RowLock> locks, boolean autoCommit, boolean skipCheckLock) {
        if (CollectionUtils.isEmpty(locks)) {
            //no lock
            return true;
        }
        try {
            //通过执行MySQL来获取全局锁
            return lockStore.acquireLock(convertToLockDO(locks), autoCommit, skipCheckLock);
        } catch (StoreException e) {
            throw e;
        } catch (Exception t) {
            LOGGER.error("AcquireLock error, locks:{}", CollectionUtils.toString(locks), t);
            return false;
        }
    }
    ...
}

public class LockStoreDataBaseDAO implements LockStore {
    ...
    @Override
    public boolean acquireLock(List<LockDO> lockDOs, boolean autoCommit, boolean skipCheckLock) {
        //数据库操作三剑客:连接、句柄、结果
        Connection conn = null;
        PreparedStatement ps = null;
        ResultSet rs = null;

        Set<String> dbExistedRowKeys = new HashSet<>();
        boolean originalAutoCommit = true;
        if (lockDOs.size() > 1) {
            lockDOs = lockDOs.stream().filter(LambdaUtils.distinctByKey(LockDO::getRowKey)).collect(Collectors.toList());
        }
        try {
            //从全局锁数据源里获取到一个连接
            conn = lockStoreDataSource.getConnection();
            //关闭自动提交事务
            if (originalAutoCommit = conn.getAutoCommit()) {
                conn.setAutoCommit(false);
            }
            //需要获取的锁,有可能多个
            List<LockDO> unrepeatedLockDOs = lockDOs;

            //check lock
            if (!skipCheckLock) {
                boolean canLock = true;

                //query,针对全局锁表查询某个数据加了全局锁的全局事务xid
                //LockStoreSqlFactory是全局锁存储的SQL工厂
                String checkLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getCheckLockableSql(lockTable, lockDOs.size());
                ps = conn.prepareStatement(checkLockSQL);
                for (int i = 0; i < lockDOs.size(); i++) {
                    ps.setString(i + 1, lockDOs.get(i).getRowKey());
                }
                //执行查询
                rs = ps.executeQuery();

                //获取到当前要加全局锁的事务xid
                String currentXID = lockDOs.get(0).getXid();
                boolean failFast = false;

                //如果查询到的结果rs是空,则表示当前全局锁没有被事务获取占用
                while (rs.next()) {
                    String dbXID = rs.getString(ServerTableColumnsName.LOCK_TABLE_XID);

                    //如果获取到全局锁的是别的全局事务xid,那么获取全局锁失败,设置canLock为false
                    if (!StringUtils.equals(dbXID, currentXID)) {
                        if (LOGGER.isInfoEnabled()) {
                            String dbPk = rs.getString(ServerTableColumnsName.LOCK_TABLE_PK);
                            String dbTableName = rs.getString(ServerTableColumnsName.LOCK_TABLE_TABLE_NAME);
                            long dbBranchId = rs.getLong(ServerTableColumnsName.LOCK_TABLE_BRANCH_ID);
                            LOGGER.info("Global lock on [{}:{}] is holding by xid {} branchId {}", dbTableName, dbPk, dbXID, dbBranchId);
                        }
                        if (!autoCommit) {
                            int status = rs.getInt(ServerTableColumnsName.LOCK_TABLE_STATUS);
                            if (status == LockStatus.Rollbacking.getCode()) {
                                failFast = true;
                            }
                        }
                        canLock = false;
                        break;
                    }

                    dbExistedRowKeys.add(rs.getString(ServerTableColumnsName.LOCK_TABLE_ROW_KEY));
                }

                if (!canLock) {
                    conn.rollback();
                    if (failFast) {
                        throw new StoreException(new BranchTransactionException(LockKeyConflictFailFast));
                    }
                    return false;
                }

                //If the lock has been exists in db, remove it from the lockDOs
                if (CollectionUtils.isNotEmpty(dbExistedRowKeys)) {
                    //过滤当前事务已经获取过的全局锁
                    unrepeatedLockDOs = lockDOs.stream().filter(lockDO -> !dbExistedRowKeys.contains(lockDO.getRowKey())).collect(Collectors.toList());
                }
                if (CollectionUtils.isEmpty(unrepeatedLockDOs)) {
                    conn.rollback();
                    return true;
                }
            }

            //lock
            if (unrepeatedLockDOs.size() == 1) {
                LockDO lockDO = unrepeatedLockDOs.get(0);
                //尝试加锁,表示全局锁被当前的分支事务获取了
                if (!doAcquireLock(conn, lockDO)) {
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("Global lock acquire failed, xid {} branchId {} pk {}", lockDO.getXid(), lockDO.getBranchId(), lockDO.getPk());
                    }
                    conn.rollback();
                    return false;
                }
            } else {
                if (!doAcquireLocks(conn, unrepeatedLockDOs)) {
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("Global lock batch acquire failed, xid {} branchId {} pks {}", unrepeatedLockDOs.get(0).getXid(), unrepeatedLockDOs.get(0).getBranchId(), unrepeatedLockDOs.stream().map(lockDO -> lockDO.getPk()).collect(Collectors.toList()));
                    }
                    conn.rollback();
                    return false;
                }
            }
            conn.commit();
            return true;
        } catch (SQLException e) {
            throw new StoreException(e);
        } finally {
            IOUtil.close(rs, ps);
            if (conn != null) {
                try {
                    if (originalAutoCommit) {
                        conn.setAutoCommit(true);
                    }
                    conn.close();
                } catch (SQLException e) {
                }
            }
        }
    }
    
    protected boolean doAcquireLock(Connection conn, LockDO lockDO) {
        PreparedStatement ps = null;
        try {
            //insert
            String insertLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getInsertLockSQL(lockTable);
            ps = conn.prepareStatement(insertLockSQL);
            ps.setString(1, lockDO.getXid());//全局事务xid
            ps.setLong(2, lockDO.getTransactionId());//全局事务id
            ps.setLong(3, lockDO.getBranchId());//分支事务id
            ps.setString(4, lockDO.getResourceId());//资源id
            ps.setString(5, lockDO.getTableName());//表名称
            ps.setString(6, lockDO.getPk());//主键
            ps.setString(7, lockDO.getRowKey());//rowkey
            ps.setInt(8, LockStatus.Locked.getCode());//locked
            return ps.executeUpdate() > 0;
        } catch (SQLException e) {
            throw new StoreException(e);
        } finally {
            IOUtil.close(ps);
        }
    }
    ...
}

相关推荐

悠悠万事,吃饭为大(悠悠万事吃饭为大,什么意思)

新媒体编辑:杜岷赵蕾初审:程秀娟审核:汤小俊审签:周星...

高铁扒门事件升级版!婚宴上‘冲喜’老人团:我们抢的是社会资源

凌晨两点改方案时,突然收到婚庆团队发来的视频——胶东某酒店宴会厅,三个穿大红棉袄的中年妇女跟敢死队似的往前冲,眼瞅着就要扑到新娘的高额钻石项链上。要不是门口小伙及时阻拦,这婚礼造型团队熬了三个月的方案...

微服务架构实战:商家管理后台与sso设计,SSO客户端设计

SSO客户端设计下面通过模块merchant-security对SSO客户端安全认证部分的实现进行封装,以便各个接入SSO的客户端应用进行引用。安全认证的项目管理配置SSO客户端安全认证的项目管理使...

还在为 Spring Boot 配置类加载机制困惑?一文为你彻底解惑

在当今微服务架构盛行、项目复杂度不断攀升的开发环境下,SpringBoot作为Java后端开发的主流框架,无疑是我们手中的得力武器。然而,当我们在享受其自动配置带来的便捷时,是否曾被配置类加载...

Seata源码—6.Seata AT模式的数据源代理二

大纲1.Seata的Resource资源接口源码2.Seata数据源连接池代理的实现源码3.Client向Server发起注册RM的源码4.Client向Server注册RM时的交互源码5.数据源连接...

30分钟了解K8S(30分钟了解微积分)

微服务演进方向o面向分布式设计(Distribution):容器、微服务、API驱动的开发;o面向配置设计(Configuration):一个镜像,多个环境配置;o面向韧性设计(Resista...

SpringBoot条件化配置(@Conditional)全面解析与实战指南

一、条件化配置基础概念1.1什么是条件化配置条件化配置是Spring框架提供的一种基于特定条件来决定是否注册Bean或加载配置的机制。在SpringBoot中,这一机制通过@Conditional...

一招解决所有依赖冲突(克服依赖)

背景介绍最近遇到了这样一个问题,我们有一个jar包common-tool,作为基础工具包,被各个项目在引用。突然某一天发现日志很多报错。一看是NoSuchMethodError,意思是Dis...

你读过Mybatis的源码?说说它用到了几种设计模式

学习设计模式时,很多人都有类似的困扰——明明概念背得滚瓜烂熟,一到写代码就完全想不起来怎么用。就像学了一堆游泳技巧,却从没下过水实践,很难真正掌握。其实理解一个知识点,就像看立体模型,单角度观察总...

golang对接阿里云私有Bucket上传图片、授权访问图片

1、为什么要设置私有bucket公共读写:互联网上任何用户都可以对该Bucket内的文件进行访问,并且向该Bucket写入数据。这有可能造成您数据的外泄以及费用激增,若被人恶意写入违法信息还可...

spring中的资源的加载(spring加载原理)

最近在网上看到有人问@ContextConfiguration("classpath:/bean.xml")中除了classpath这种还有其他的写法么,看他的意思是想从本地文件...

Android资源使用(android资源文件)

Android资源管理机制在Android的开发中,需要使用到各式各样的资源,这些资源往往是一些静态资源,比如位图,颜色,布局定义,用户界面使用到的字符串,动画等。这些资源统统放在项目的res/独立子...

如何深度理解mybatis?(如何深度理解康乐服务质量管理的5个维度)

深度自定义mybatis回顾mybatis的操作的核心步骤编写核心类SqlSessionFacotryBuild进行解析配置文件深度分析解析SqlSessionFacotryBuild干的核心工作编写...

@Autowired与@Resource原理知识点详解

springIOCAOP的不多做赘述了,说下IOC:SpringIOC解决的是对象管理和对象依赖的问题,IOC容器可以理解为一个对象工厂,我们都把该对象交给工厂,工厂管理这些对象的创建以及依赖关系...

java的redis连接工具篇(java redis client)

在Java里,有不少用于连接Redis的工具,下面为你介绍一些主流的工具及其特点:JedisJedis是Redis官方推荐的Java连接工具,它提供了全面的Redis命令支持,且...