面试官:Spring事务是如何传播的?
bigegpt 2025-04-28 23:30 4 浏览
前言
上一篇分析了事务注解的解析过程,本质上是将事务封装为切面加入到AOP的执行链中,因此会调用到MethodInceptor的实现类的invoke方法,而事务切面的Interceptor就是TransactionInterceptor,所以本篇直接从该类开始。
事务切面的调用过程
public Object invoke(MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
这个方法本身没做什么事,主要是调用了父类的invokeWithinTransaction方法,注意最后一个参数,传入的是一个lambda表达式,而这个表达式中的调用的方法应该不陌生,在分析AOP调用链时,就是通过这个方法传递到下一个切面或是调用被代理实例的方法,忘记了的可以回去看看。
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// If the transaction attribute is null, the method is non-transactional.
//获取事务属性类 AnnotationTransactionAttributeSource
TransactionAttributeSource tas = getTransactionAttributeSource();
//获取方法上面有@Transactional注解的属性
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
//获取事务管理器
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// 调用proceed方法
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
//事务回滚
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);
}
//事务提交
commitTransactionAfterReturning(txInfo);
return retVal;
}
// 省略了else
}
这个方法逻辑很清晰,一目了然,if里面就是对声明式事务的处理,先调用
createTransactionIfNecessary方法开启事务,然后通过
invocation.proceedWithInvocation调用下一个切面,如果没有其它切面了,就是调用被代理类的方法,出现异常就回滚,否则提交事务,这就是Spring事务切面的执行过程。但是,我们主要要搞懂的就是在这些方法中是如何管理事务以及事务在多个方法之间是如何传播的。
事务的传播性概念
传播性是Spring自己搞出来的,数据库是没有的,因为涉及到方法间的调用,那么必然就需要考虑事务在这些方法之间如何流转,所以Spring提供了7个传播属性供选择,可以将其看成两大类,即是否支持当前事务:
1.支持当前事务(在同一个事务中):
- PROPAGATION_REQUIRED:支持当前事务,如果不存在,就新建一个事务。
- PROPAGATION_MANDATORY:支持当前事务,如果不存在,就抛出异常。
- PROPAGATION_SUPPORTS:支持当前事务,如果不存在,就不使用事务。
2.不支持当前事务(不在同一个事务中):
- PROPAGATION_NEVER:以非事务的方式运行,如果有事务,则抛出异常。
- PROPAGATION_NOT_SUPPORTED:以非事务的方式运行,如果有事务,则挂起当前事务。
- PROPAGATION_REQUIRES_NEW:新建事务,如果有事务,挂起当前事务(两个事务相互独立,父事务回滚不影响子事务)。
- PROPAGATION_NESTED:如果当前事务存在,则嵌套事务执行(指必须依存父事务,子事务不能单独提交且父事务回滚则子事务也必须回滚,而子事务若回滚,父事务可以回滚也可以捕获异常)。如果当前没有事务,则进行与PROPAGATION_REQUIRED类似的操作。
别看属性这么多,实际上我们主要用的是PROPAGATION_REQUIRED默认属性,一些特殊业务下可能会用到PROPAGATION_REQUIRES_NEW以及PROPAGATION_NESTED。下面我会假设一个场景,并主要分析这三个属性。
public class A {
@Autowired
private B b;
@Transactional
public void addA() {
b.addB();
}
}
public class B {
@Transactional
public void addB() {
// doSomething...
}
}
上面我创建了A、B两个类,每个类中有一个事务方法,使用了声明式事务并采用的默认传播属性,在A中调用了B的方法。
当请求来了调用addA时,首先调用的是代理对象的方法,因此会进入
createTransactionIfNecessary方法开启事务:
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// If no name specified, apply method identification as transaction name.
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
//开启事务,这里重点看
status = tm.getTransaction(txAttr);
}
else {
}
}
//创建事务信息对象,记录新老事务信息对象
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
实际上开启事务是通过
AbstractPlatformTransactionManager做的,而这个类是一个抽象类,具体实例化的对象就是我们在项目里常配置的
DataSourceTransactionManager对象。
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
//这里重点看,.DataSourceTransactionObject拿到对象
Object transaction = doGetTransaction();
// Cache debug flag to avoid repeated checks.
boolean debugEnabled = logger.isDebugEnabled();
if (definition == null) {
// Use defaults if no transaction definition given.
definition = new DefaultTransactionDefinition();
}
//第一次进来connectionHolder为空的,所以不存在事务
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(definition, transaction, debugEnabled);
}
// Check definition settings for new transaction.
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}
// No existing transaction found -> check propagation behavior to find out how to proceed.
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
//第一次进来大部分会走这里
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
//先挂起
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
//创建事务状态对象,其实就是封装了事务对象的一些信息,记录事务状态的
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//开启事务,重点看看 DataSourceTransactionObject
doBegin(transaction, definition);
//开启事务后,改变事务状态
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
这个方法流程比较长,一步步来看,先调用doGetTransaction方法获取一个
DataSourceTransactionObject对象,这个类是
JdbcTransactionObjectSupport的子类,在父类中持有了一个ConnectionHolder对象,见名知意,这个对象保存了当前的连接。
protected Object doGetTransaction() {
//管理connection对象,创建回滚点,按照回滚点回滚,释放回滚点
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
//DataSourceTransactionManager默认是允许嵌套事务的
txObject.setSavepointAllowed(isNestedTransactionAllowed());
//obtainDataSource() 获取数据源对象,其实就是数据库连接块对象
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
追溯getResource方法可以看到ConnectionHolder 是从ThreadLocal里获取的,也就是当前线程,key是DataSource对象;但是仔细思考下我们是第一次进来,所以这里肯定获取不到的,反之,要从这里获取到值,那必然是同一个线程第二次及以后进入到这里,也就是在addA调用addB时,另外需要注意这里保存ConnectionHolder到
DataSourceTransactionObject对象时是将newConnectionHolder属性设置为false了的。
继续往后,创建完transaction对象后,会调用isExistingTransaction判断是否已经存在一个事务,如果存在就会调用handleExistingTransaction方法,这个方法就是处理事务传播的核心方法,因为我们是第一次进来,肯定不存在事务,所以先跳过。
再往后,可以看到就是处理不同的传播属性,主要看到下面这个部分:
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
//先挂起
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
//创建事务状态对象,其实就是封装了事务对象的一些信息,记录事务状态的
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//开启事务,重点看看 DataSourceTransactionObject
doBegin(transaction, definition);
//开启事务后,改变事务状态
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
第一次进来时,PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW和PROPAGATION_NESTED都会进入到这里,首先会调用suspend挂起当前存在的事务,在这里没啥作用。接下来通过newTransactionStatus创建了DefaultTransactionStatus对象,这个对象主要就是存储当前事务的一些状态信息,需要特别注意newTransaction属性设置为了true,表示是一个新事务。状态对象创建好之后就是通过doBegin开启事务,这是一个模板方法:
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
//如果没有数据库连接
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
//从连接池里面获取连接
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
//把连接包装成ConnectionHolder,然后设置到事务对象中
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
//从数据库连接中获取隔离级别
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
//关闭连接的自动提交,其实这步就是开启了事务
con.setAutoCommit(false);
}
//设置只读事务 从这一点设置的时间点开始(时间点a)到这个事务结束的过程中,其他事务所提交的数据,该事务将看不见!
//设置只读事务就是告诉数据库,我这个事务内没有新增,修改,删除操作只有查询操作,不需要数据库锁等操作,减少数据库压力
prepareTransactionalConnection(con, definition);
//自动提交关闭了,就说明已经开启事务了,事务是活动的
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
//如果是新创建的事务,则建立当前线程和数据库连接的关系
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
这个方法里面主要做了六件事:
- 首先从连接池获取连接并保存到DataSourceTransactionObject对象中。
- 关闭数据库的自动提交,也就是开启事务。
- 获取数据库的隔离级别。
- 根据属性设置该事务是否为只读事务。
- 将该事务标识为活动事务(transactionActive=true)。
- 将ConnectionHolder对象与当前线程绑定。
完成之后通过prepareSynchronization将事务的属性和状态设置到
TransactionSynchronizationManager对象中进行管理。最后返回到
createTransactionIfNecessary方法中创建TransactionInfo对象与当前线程绑定并返回。
通过以上的步骤就开启了事务,接下来就是通过proceedWithInvocation调用其它切面,这里我们先假设没有其它切面了,那么就是直接调用到A类的addA方法,在这个方法中又调用了B类的addB方法,那么肯定也是调用到代理类的方法,因此又会进入到
createTransactionIfNecessary方法中。但这次进来通过isExistingTransaction判断是存在事务的,因此会进入到handleExistingTransaction方法:
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
//不允许有事务,直接异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
//以非事务方式执行操作,如果当前存在事务,就把当前事务挂起
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
//挂起当前事务
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
//修改事务状态信息,把事务的一些信息存储到当前线程中,ThreadLocal中
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
//挂起
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
//默认是可以嵌套事务的
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
//创建回滚点
status.createAndHoldSavepoint();
return status;
}
else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}
// 省略
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
这里面也是对每个传播属性的判断,先看PROPAGATION_REQUIRES_NEW的处理,因为该属性要求每次调用都开启一个新的事务,所以首先会将当前事务挂起,怎么挂起呢?
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
//第一次进来,肯定为null的
if (transaction != null) {
//吧connectionHolder设置为空
suspendedResources = doSuspend(transaction);
}
//做数据还原操作
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
return new SuspendedResourcesHolder(
suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
}
catch (RuntimeException | Error ex) {
// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
}
else if (transaction != null) {
// Transaction active but no synchronization active.
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
}
else {
// Neither transaction nor synchronization active.
return null;
}
}
protected Object doSuspend(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
txObject.setConnectionHolder(null);
//解除绑定关系,
return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
这里明显是进入第一个if并且会调用到doSuspend方法,整体来说挂起事务很简单:首先将
DataSourceTransactionObject的ConnectionHolder设置为空并解除与当前线程的绑定,之后将解除绑定的ConnectionHolder和其它属性(事务名称、隔离级别、只读属性)通通封装到SuspendedResourcesHolder对象,并将当前事务的活动状态设置为false。挂起事务之后又通过newTransactionStatus创建了一个新的事务状态并调用doBegin开启事务,这里不再重复分析。
接着来看PROPAGATION_NESTED:
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
//默认是可以嵌套事务的
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
//创建回滚点
status.createAndHoldSavepoint();
return status;
}
else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}
这里面可以看到如果允许嵌套事务,就会创建一个DefaultTransactionStatus对象(注意newTransaction是false,表明不是一个新事务)和回滚点;如果不允许嵌套,就会创建新事务并开启。
当上面的判断都不满足时,也就是传播属性为默认PROPAGATION_REQUIRED时,则只是创建了一个newTransaction为false的DefaultTransactionStatus返回。
完成之后又是调用proceedWithInvocation,那么就是执行B类的addB方法,假如没有发生异常,那么就会回到切面调用
commitTransactionAfterReturning提交addB的事务:
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
public final void commit(TransactionStatus status) throws TransactionException {
processCommit(defStatus);
}
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
// 如果是nested,没有提交,只是将savepoint清除掉了
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
}
//如果都是PROPAGATION_REQUIRED,最外层的才会走进来统一提交,如果是PROPAGATION_REQUIRES_NEW,每一个事务都会进来
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
doCommit(status);
}
else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
}
}
finally {
cleanupAfterCompletion(status);
}
}
主要逻辑在processCommit方法中。如果存在回滚点,可以看到并没有提交事务,只是将当前事务的回滚点清除了;而如果是新事务,就会调用doCommit提交事务,也就是只有PROPAGATION_REQUIRED属性下的最外层事务和PROPAGATION_REQUIRES_NEW属性下的事务能提交。事务提交完成后会调用cleanupAfterCompletion清除当前事务的状态,如果有挂起的事务还会通过resume恢复挂起的事务(将解绑的连接和当前线程绑定以及将之前保存的事务状态重新设置回去)。当前事务正常提交后,那么就会轮到addA方法提交,处理逻辑同上,不再赘述。
如果调用addB发生异常,就会通过
completeTransactionAfterThrowing进行回滚:
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
"] after exception: " + ex);
}
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
}
}
}
public final void rollback(TransactionStatus status) throws TransactionException {
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
processRollback(defStatus, false);
}
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;
try {
triggerBeforeCompletion(status);
//按照嵌套事务按照回滚点回滚
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
status.rollbackToHeldSavepoint();
}
//都为PROPAGATION_REQUIRED最外层事务统一回滚
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
doRollback(status);
}
else {
// Participating in larger transaction
if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
doSetRollbackOnly(status);
}
else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
// Unexpected rollback only matters here if we're asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
}
catch (RuntimeException | Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
}
finally {
cleanupAfterCompletion(status);
}
}
流程和提交是一样的,先是判断有没有回滚点,如果有就回到到回滚点并清除该回滚点;如果没有则判断是不是新事务(PROPAGATION_REQUIRED属性下的最外层事务和PROPAGATION_REQUIRES_NEW属性下的事务),满足则直接回滚当前事务。回滚完成后同样需要清除掉当前的事务状态并恢复挂起的连接。另外需要特别注意的是在catch里面调用完回滚逻辑后,还通过throw抛出了异常,这意味着什么?意味着即使是嵌套事务,内层事务的回滚也会导致外层事务的回滚,也就是addA的事务也会跟着回滚。
至此,事务的传播原理分析完毕,深入看每个方法的实现是很复杂的,但如果仅仅是分析各个传播属性对事务的影响,则有一个简单的方法。我们可以将内层事务切面等效替换掉
invocation.proceedWithInvocation方法,比如上面两个类的调用可以看作是下面这样:
// addA的事务
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// addB的事务
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
//事务回滚
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);
}
//事务提交
commitTransactionAfterReturning(txInfo);
}
catch (Throwable ex) {
//事务回滚
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
//事务提交
commitTransactionAfterReturning(txInfo);
这样看是不是很容易就能分析出事务之间的影响以及是提交还是回滚了?下面来看几个实例分析。
实例分析
我再添加一个C类,和addC的方法,然后在addA里面调用这个方法。
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// addB的事务
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
b.addB();
}
catch (Throwable ex) {
// target invocation exception
//事务回滚
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
//事务提交
commitTransactionAfterReturning(txInfo);
// addC的事务
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
c.addC();
}
catch (Throwable ex) {
// target invocation exception
//事务回滚
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
//事务提交
commitTransactionAfterReturning(txInfo);
}
catch (Throwable ex) {
//事务回滚
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
//事务提交
commitTransactionAfterReturning(txInfo);
等效替换后就是上面这个代码,我们分别来分析。
都是PROPAGATION_REQUIRED属性:通过上面的分析,我们知道三个方法都是同一个连接和事务,那么任何一个出现异常则都会回滚。
addB为PROPAGATION_REQUIRES_NEW:
- 如果B中抛出异常,那么B中肯定会回滚,接着异常向上抛,导致A事务整体回滚;
- 如果C中抛出异常,不难看出C和A都会回滚,但B已经提交了,因此不会受影响。
- addC为PROPAGATION_NESTED,addB为PROPAGATION_REQUIRES_NEW:
- 如果B中抛出异常,那么B回滚并抛出异常,A也回滚,C不会执行;
- 如果C中抛出异常,先是回滚到回滚点并抛出异常,所以A也回滚,但B此时已经提交,不受影响。
- 都是PROPAGATION_NESTED:虽然创建了回滚点,但是仍然是同一个连接,任何一个发生异常都会回滚,如果不想影响彼此,可以try-catch生吞子事务的异常实现。
还有其它很多情况,这里就不一一列举了,只要使用上面的分析方法都能够很轻松的分析出来。
总结
本篇详细分析了事务的传播原理,另外还有隔离级别,这在Spring中没有体现,需要我们自己结合数据库的知识进行分析设置。最后我们还需要考虑声明式事务和编程式事务的优缺点,声明式事务虽然简单,但不适合用在长事务中,会占用大量连接资源,这时就需要考虑利用编程式事务的灵活性了。总而言之,事务的使用并不是一律默认就好,接口的一致性和吞吐量与事务有着直接关系,严重情况下可能会导致系统崩溃。
作者:夜勿语
原文链接:
https://blog.csdn.net/l6108003/article/details/106696735
- 上一篇:我爱你,永远爱你
- 下一篇:Spring事务传播机制
相关推荐
- 得物可观测平台架构升级:基于GreptimeDB的全新监控体系实践
-
一、摘要在前端可观测分析场景中,需要实时观测并处理多地、多环境的运行情况,以保障Web应用和移动端的可用性与性能。传统方案往往依赖代理Agent→消息队列→流计算引擎→OLAP存储...
- warm-flow新春版:网关直连和流程图重构
-
本期主要解决了网关直连和流程图重构,可以自此之后可支持各种复杂的网关混合、多网关直连使用。-新增Ruoyi-Vue-Plus优秀开源集成案例更新日志[feat]导入、导出和保存等新增json格式支持...
- 扣子空间体验报告
-
在数字化时代,智能工具的应用正不断拓展到我们工作和生活的各个角落。从任务规划到项目执行,再到任务管理,作者深入探讨了这款工具在不同场景下的表现和潜力。通过具体的应用实例,文章展示了扣子空间如何帮助用户...
- spider-flow:开源的可视化方式定义爬虫方案
-
spider-flow简介spider-flow是一个爬虫平台,以可视化推拽方式定义爬取流程,无需代码即可实现一个爬虫服务。spider-flow特性支持css选择器、正则提取支持JSON/XML格式...
- solon-flow 你好世界!
-
solon-flow是一个基础级的流处理引擎(可用于业务规则、决策处理、计算编排、流程审批等......)。提供有“开放式”驱动定制支持,像jdbc有mysql或pgsql等驱动,可...
- 新一代开源爬虫平台:SpiderFlow
-
SpiderFlow:新一代爬虫平台,以图形化方式定义爬虫流程,不写代码即可完成爬虫。-精选真开源,释放新价值。概览Spider-Flow是一个开源的、面向所有用户的Web端爬虫构建平台,它使用Ja...
- 通过 SQL 训练机器学习模型的引擎
-
关注薪资待遇的同学应该知道,机器学习相关的岗位工资普遍偏高啊。同时随着各种通用机器学习框架的出现,机器学习的门槛也在逐渐降低,训练一个简单的机器学习模型变得不那么难。但是不得不承认对于一些数据相关的工...
- 鼠须管输入法rime for Mac
-
鼠须管输入法forMac是一款十分新颖的跨平台输入法软件,全名是中州韵输入法引擎,鼠须管输入法mac版不仅仅是一个输入法,而是一个输入法算法框架。Rime的基础架构十分精良,一套算法支持了拼音、...
- Go语言 1.20 版本正式发布:新版详细介绍
-
Go1.20简介最新的Go版本1.20在Go1.19发布六个月后发布。它的大部分更改都在工具链、运行时和库的实现中。一如既往,该版本保持了Go1的兼容性承诺。我们期望几乎所...
- iOS 10平台SpriteKit新特性之Tile Maps(上)
-
简介苹果公司在WWDC2016大会上向人们展示了一大批新的好东西。其中之一就是SpriteKitTileEditor。这款工具易于上手,而且看起来速度特别快。在本教程中,你将了解关于TileE...
- 程序员简历例句—范例Java、Python、C++模板
-
个人简介通用简介:有良好的代码风格,通过添加注释提高代码可读性,注重代码质量,研读过XXX,XXX等多个开源项目源码从而学习增强代码的健壮性与扩展性。具备良好的代码编程习惯及文档编写能力,参与多个高...
- Telerik UI for iOS Q3 2015正式发布
-
近日,TelerikUIforiOS正式发布了Q32015。新版本新增对XCode7、Swift2.0和iOS9的支持,同时还新增了对数轴、不连续的日期时间轴等;改进TKDataPoin...
- ios使用ijkplayer+nginx进行视频直播
-
上两节,我们讲到使用nginx和ngixn的rtmp模块搭建直播的服务器,接着我们讲解了在Android使用ijkplayer来作为我们的视频直播播放器,整个过程中,需要注意的就是ijlplayer编...
- IOS技术分享|iOS快速生成开发文档(一)
-
前言对于开发人员而言,文档的作用不言而喻。文档不仅可以提高软件开发效率,还能便于以后的软件开发、使用和维护。本文主要讲述Objective-C快速生成开发文档工具appledoc。简介apple...
- macOS下配置VS Code C++开发环境
-
本文介绍在苹果macOS操作系统下,配置VisualStudioCode的C/C++开发环境的过程,本环境使用Clang/LLVM编译器和调试器。一、前置条件本文默认前置条件是,您的开发设备已...
- 一周热门
- 最近发表
- 标签列表
-
- mybatiscollection (79)
- mqtt服务器 (88)
- keyerror (78)
- c#map (65)
- resize函数 (64)
- xftp6 (83)
- bt搜索 (75)
- c#var (76)
- mybatis大于等于 (64)
- xcode-select (66)
- httperror403.14-forbidden (63)
- logstashinput (65)
- hadoop端口 (65)
- dockernetworkconnect (63)
- esxi7 (63)
- vue阻止冒泡 (67)
- c#for循环 (63)
- oracle时间戳转换日期 (64)
- jquery跨域 (68)
- php写入文件 (73)
- java大写转小写 (63)
- kafkatools (66)
- mysql导出数据库 (66)
- jquery鼠标移入移出 (71)
- 取小数点后两位的函数 (73)