SpringFramework源码解析——@Transactional是如何生效的?
1. 背景
相比于JDBC
的编程式事务,Spring提供了一种更便捷的处理事务的方式,即声明式事务。我们只需要在方法或类上标注@Transactional
注解便可完成事务的声明,如下示例:
@Transactional(rollbackFor = Exception.class)
public void pay() {
jdbcTemplate.execute("update balance = balance + 1 where id = 1");
jdbcTemplate.execute("update balance = balance - 1 where id = 2");
}
当上面的方法抛出异常时,整个事务会进行回滚。下面我们来看@Transactional
注解是如何生效的?
2. 声明式事务是如何实现的?
声明式事务是在事务切面中完成了事务的创建、提交和回滚。
2.1. 事务切面
事务切面在配置类ProxyTransactionManagementConfiguration
中配置,切面相关要素如下:
- 事务切点:事务属性来源切点
TransactionAttributeSourcePointcut
。 - 事务通知:事务拦截器
TransactionInterceptor
。 - 事务切面:Bean工厂事务属性来源切面
BeanFactoryTransactionAttributeSourceAdvisor
。
2.2. 事务切点
事务切点由事务属性来源切点TransactionAttributeSourcePointcut
实现,在以下几种情况会进行方法的拦截:
- 当类能够被事务属性来源
TransactionAttributeSource
认定为候选类时。 - 当方法和类能够被事务属性来源
TransactionAttributeSource
获取到事务属性TransactionAttribute
时。
@Override
public boolean matches(Method method, Class<?> targetClass) {
// 事务属性来源为空或存在事务属性时,切点匹配
return (this.transactionAttributeSource == null ||
this.transactionAttributeSource.getTransactionAttribute(method, targetClass) != null);
}
private class TransactionAttributeSourceClassFilter implements ClassFilter {
@Override
public boolean matches(Class<?> clazz) {
if (TransactionalProxy.class.isAssignableFrom(clazz) ||
TransactionManager.class.isAssignableFrom(clazz) ||
PersistenceExceptionTranslator.class.isAssignableFrom(clazz)) {
return false;
}
// 是否是候选类
return (transactionAttributeSource == null || transactionAttributeSource.isCandidateClass(clazz));
}
}
对应源码所在位置:
TransactionAttributeSourcePointcut#matches
。
2.3. 事务通知
事务通知由事务拦截器TransactionInterceptor
实现,其完成了事务方法的拦截处理。
3. 事务拦截器是如何执行的?
事务拦截器的执行流程:
- 获取事务属性;
- 如果必要则创建事务;
- 执行方法;
- 抛出异常时决定是否需要回滚事务;
- 执行正常返回时提交事务。
对应源码所在位置:
TransactionAspectSupport#invokeWithinTransaction
。
3.1. 获取事务属性
3.1.1. 事务属性
事务属性TransactionAttribute
是对事务信息的抽象,其包含以下属性:
propagationBehavior
:事务的传播行为。isolationLevel
:事务的隔离级别。timeout
:事务的超时时间。readOnly
:事务是否只读。rollbackOn
:事务需要回滚的异常。
3.1.2. 事务属性来源
事务属性来源TransactionAttributeSource
提供了getTransactionAttribute
方法,其拥有从方法和类上获取事务属性TransactionAttribute
的能力。
从实现上看,注解事务属性来源AnnotationTransactionAttributeSource
将获取事务属性TransactionAttribute
的逻辑委派给了事务注解转换器TransactionAnnotationParser
。
- 从方法
Method
上获取事务属性TransactionAttribute
。 - 从类
Class
上获取事务属性TransactionAttribute
。 - 退路到接口方法获取事务属性
TransactionAttribute
。
protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
// Don't allow non-public methods, as configured.
if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
// 事务仅允许公共方法,且方法的修饰符不是公共的,返回空
return null;
}
// The method may be on an interface, but we need attributes from the target class.
// If the target class is null, the method will be unchanged.
// 获取具体的方法
Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);
// First try is the method in the target class.
// 从方法上获取事务属性
TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
if (txAttr != null) {
return txAttr;
}
// Second try is the transaction attribute on the target class.
// 从类上获取事务属性
txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}
if (specificMethod != method) {
// 退路到接口方法获取事务属性
// Fallback is to look at the original method.
txAttr = findTransactionAttribute(method);
if (txAttr != null) {
return txAttr;
}
// Last fallback is the class of the original method.
txAttr = findTransactionAttribute(method.getDeclaringClass());
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}
}
return null;
}
@Override
@Nullable
protected TransactionAttribute findTransactionAttribute(Class<?> clazz) {
return determineTransactionAttribute(clazz);
}
@Nullable
protected TransactionAttribute determineTransactionAttribute(AnnotatedElement element) {
for (TransactionAnnotationParser parser : this.annotationParsers) {
TransactionAttribute attr = parser.parseTransactionAnnotation(element);
if (attr != null) {
return attr;
}
}
return null;
}
3.1.3. 事务注解转换器
事务注解转换器TransactionAnnotationParser
提供了parseTransactionAnnotation
方法,其拥有将注解转换为事务属性TransactionAttribute
的能力。从实现上看,Spring事务注解转换器SpringTransactionAnnotationParser实现了将
@Transactional注解转为事务属性
TransactionAttribute`的逻辑。
@Override
@Nullable
public TransactionAttribute parseTransactionAnnotation(AnnotatedElement element) {
// 获取事务注解
AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes(
element, Transactional.class, false, false);
if (attributes != null) {
return parseTransactionAnnotation(attributes);
}
else {
return null;
}
}
protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
// 创建基于规则的事务属性
RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();
Propagation propagation = attributes.getEnum("propagation");
rbta.setPropagationBehavior(propagation.value());
Isolation isolation = attributes.getEnum("isolation");
rbta.setIsolationLevel(isolation.value());
rbta.setTimeout(attributes.getNumber("timeout").intValue());
String timeoutString = attributes.getString("timeoutString");
Assert.isTrue(!StringUtils.hasText(timeoutString) || rbta.getTimeout() < 0,
"Specify 'timeout' or 'timeoutString', not both");
rbta.setTimeoutString(timeoutString);
rbta.setReadOnly(attributes.getBoolean("readOnly"));
rbta.setQualifier(attributes.getString("value"));
rbta.setLabels(Set.of(attributes.getStringArray("label")));
List<RollbackRuleAttribute> rollbackRules = new ArrayList<>();
for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {
rollbackRules.add(new RollbackRuleAttribute(rbRule));
}
for (String rbRule : attributes.getStringArray("rollbackForClassName")) {
rollbackRules.add(new RollbackRuleAttribute(rbRule));
}
for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) {
rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
}
for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {
rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
}
rbta.setRollbackRules(rollbackRules);
return rbta;
}
对应源码所在位置:
AbstractFallbackTransactionAttributeSource#getTransactionAttribute
。
3.2. 如果必要则创建事务
事务的传播行为决定了当存在或不存在事务时执行的行为,如下表格:
传播行为 | 不存在事务 | 存在事务 |
---|---|---|
REQUIRED | 启动新事务 | 加入到当前事务 |
SUPPORTS | 以非事务的方式执行 | 加入到当前事务 |
MANDATORY | 抛出异常 | 加入到当前事务 |
REQUIRES_NEW | 启动新事务 | 挂起当前事务,并且启动新事务 |
NOT_SUPPORTED | 以非事务的方式执行 | 挂起当前事务,并且以非事务的方式执行 |
NEVER | 以非事务的方式执行 | 抛出异常 |
NESTED | 启动新事务 | 启动嵌套事务 |
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
// Use defaults if no transaction definition given.
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
// 获取当前事务
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
// 如果存在事务,进行相应处理
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(def, transaction, debugEnabled);
}
// Check definition settings for new transaction.
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
// 如果不存在事务,进行相应处理
// No existing transaction found -> check propagation behavior to find out how to proceed.
// 当前不存在事务,且传播行为为 MANDATORY,则抛出异常
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
// 当前不存在事务,且传播行为为 REQUIRED 或 REQUIRES_NEW 或 NESTED,则启动新的事务
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
return startTransaction(def, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
// 当前不存在事务,且传播行为为 SUPPORTS 或 NOT_SUPPORTED 或 NEVER,则以非事务的方式执行
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + def);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
// 当前存在事务,且传播行为为 NEVER,则抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
// 当前存在事务,且传播行为为 NOT_SUPPORTED,则挂起当前事务,以非事务的方式执行
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
// 当前存在事务,且传播行为为 REQUIRES_NEW,则挂起当前事务,并启动新事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
return startTransaction(definition, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
// 当前存在事务,且传播行为为 NESTED,则启用嵌套事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
status.createAndHoldSavepoint();
return status;
}
else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
return startTransaction(definition, transaction, debugEnabled, null);
}
}
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
// 当前存在事务,且传播行为为 SUPPORTS 或 REQUIRED 或 MANDATORY,则加入到当前事务中
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
对应源码所在位置:
TransactionAspectSupport#createTransactionIfNecessary
。
3.3. 执行方法
执行被代理对象的方法。
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
// 执行方法
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
// 在捕获到异常后,决定是否需要回滚事务
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
// 清除事务信息
cleanupTransactionInfo(txInfo);
}
对应源码所在位置:
TransactionAspectSupport.InvocationCallback#proceedWithInvocation
。
3.4. 抛出异常时决定是否需要回滚事务
当执行方法异常时,根据异常类型决定是否需要回滚事务。需要回滚时则回滚事务;不需要回滚时则提交事务。
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
"] after exception: " + ex);
}
// 判断是否需要回滚
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
// 需要回滚则回滚事务
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
}
else {
// We don't roll back on this exception.
// Will still roll back if TransactionStatus.isRollbackOnly() is true.
try {
// 不需要回滚则提交事务
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by commit exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by commit exception", ex);
throw ex2;
}
}
}
}
由基于规则的事务属性RuleBasedTransactionAttribute
来决定是否需要回滚,其rollbackOn
方法会获取最接近抛出异常的声明异常,决定是否回滚。
@Override
public boolean rollbackOn(Throwable ex) {
RollbackRuleAttribute winner = null;
int deepest = Integer.MAX_VALUE;
if (this.rollbackRules != null) {
for (RollbackRuleAttribute rule : this.rollbackRules) {
int depth = rule.getDepth(ex);
// 获取最接近的规则
if (depth >= 0 && depth < deepest) {
deepest = depth;
winner = rule;
}
}
}
// User superclass behavior (rollback on unchecked) if no rule matches.
if (winner == null) {
return super.rollbackOn(ex);
}
return !(winner instanceof NoRollbackRuleAttribute);
}
回滚规则属性RollbackRuleAttribute
提供了getDepth
方法,以获取异常的深度。
// 获取指定异常进行规则匹配的深度
public int getDepth(Throwable exception) {
return getDepth(exception.getClass(), 0);
}
private int getDepth(Class<?> exceptionType, int depth) {
if (this.exceptionType != null) {
if (this.exceptionType.equals(exceptionType)) {
// Found it!
return depth;
}
}
// 包含规则即可
else if (exceptionType.getName().contains(this.exceptionPattern)) {
// Found it!
return depth;
}
// If we've gone as far as we can go and haven't found it...
if (exceptionType == Throwable.class) {
return -1;
}
return getDepth(exceptionType.getSuperclass(), depth + 1);
}
对应源码所在位置:
TransactionAspectSupport#completeTransactionAfterThrowing
。
3.5. 执行正常返回时提交事务
执行方法正常结束时,提交事务。
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
对应源码所在位置:
TransactionAspectSupport#commitTransactionAfterReturning
。