个人随笔
目录
一个简单的例子来探寻Spring事务Trasanction执行源码的主脉络(二):执行代理方法
2021-08-14 18:29:50

通过上一篇文章一个简单的例子来探寻SpringAOP执行源码的主脉络(二):执行代理方法 我们知道了AOP怎么执行代理方法,主要是靠对应的Advisor,我们知道spring会为我们的切面里面的方法比befor,after,around等方法都生成对应的Advisor,所以我们估计spring事务的代理方法逻辑跟AOP是差不多的,主要看的就是对应的Advisor里面的执行逻辑,在一个简单的例子来探寻Spring事务Trasanction执行源码的主脉络(一):创建事务代理对象文章中,我们已经知道了事务对应的Advisor是BeanFactoryTransactionAttributeSourceAdvisor,那应该是跑不了了。

我们来调试下看看

果然不出预料,那么前面的逻辑肯定是跟AOP差不多的。

  1. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  2. Object oldProxy = null;
  3. boolean setProxyContext = false;
  4. TargetSource targetSource = this.advised.targetSource;
  5. Object target = null;
  6. try {
  7. if (!this.equalsDefined && AopUtils.isEqualsMethod(method)) {
  8. // The target does not implement the equals(Object) method itself.
  9. return equals(args[0]);
  10. }
  11. else if (!this.hashCodeDefined && AopUtils.isHashCodeMethod(method)) {
  12. // The target does not implement the hashCode() method itself.
  13. return hashCode();
  14. }
  15. else if (method.getDeclaringClass() == DecoratingProxy.class) {
  16. // There is only getDecoratedClass() declared -> dispatch to proxy config.
  17. return AopProxyUtils.ultimateTargetClass(this.advised);
  18. }
  19. else if (!this.advised.opaque && method.getDeclaringClass().isInterface() &&
  20. method.getDeclaringClass().isAssignableFrom(Advised.class)) {
  21. // Service invocations on ProxyConfig with the proxy config...
  22. return AopUtils.invokeJoinpointUsingReflection(this.advised, method, args);
  23. }
  24. Object retVal;
  25. if (this.advised.exposeProxy) {
  26. // Make invocation available if necessary.
  27. oldProxy = AopContext.setCurrentProxy(proxy);
  28. setProxyContext = true;
  29. }
  30. // Get as late as possible to minimize the time we "own" the target,
  31. // in case it comes from a pool.
  32. target = targetSource.getTarget();
  33. Class<?> targetClass = (target != null ? target.getClass() : null);
  34. // Get the interception chain for this method.
  35. List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
  36. // Check whether we have any advice. If we don't, we can fallback on direct
  37. // reflective invocation of the target, and avoid creating a MethodInvocation.
  38. if (chain.isEmpty()) {
  39. // We can skip creating a MethodInvocation: just invoke the target directly
  40. // Note that the final invoker must be an InvokerInterceptor so we know it does
  41. // nothing but a reflective operation on the target, and no hot swapping or fancy proxying.
  42. Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
  43. retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);
  44. }
  45. else {
  46. // We need to create a method invocation...
  47. MethodInvocation invocation =
  48. new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain);
  49. // Proceed to the joinpoint through the interceptor chain.
  50. retVal = invocation.proceed();
  51. }
  52. // Massage return value if necessary.
  53. Class<?> returnType = method.getReturnType();
  54. if (retVal != null && retVal == target &&
  55. returnType != Object.class && returnType.isInstance(proxy) &&
  56. !RawTargetAccess.class.isAssignableFrom(method.getDeclaringClass())) {
  57. // Special case: it returned "this" and the return type of the method
  58. // is type-compatible. Note that we can't help if the target sets
  59. // a reference to itself in another returned object.
  60. retVal = proxy;
  61. }
  62. else if (retVal == null && returnType != Void.TYPE && returnType.isPrimitive()) {
  63. throw new AopInvocationException(
  64. "Null return value from advice does not match primitive return type for: " + method);
  65. }
  66. return retVal;
  67. }
  68. finally {
  69. if (target != null && !targetSource.isStatic()) {
  70. // Must have come from TargetSource.
  71. targetSource.releaseTarget(target);
  72. }
  73. if (setProxyContext) {
  74. // Restore old proxy.
  75. AopContext.setCurrentProxy(oldProxy);
  76. }
  77. }
  78. }

不过在这里我们发现

  1. List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);

这里生成的调用链里面的对象是TransactionInterceptor这个是该advisor里面的advice(通知)的invoke方法,在生成代理对象的步骤中也会把这个bean加入到容器中的。前面的调用模式跟AOP一个样,接下来我们直接进入到这个对象的invoke方法

  1. @Override
  2. @Nullable
  3. public Object invoke(MethodInvocation invocation) throws Throwable {
  4. // Work out the target class: may be {@code null}.
  5. // The TransactionAttributeSource should be passed the target class
  6. // as well as the method, which may be from an interface.
  7. Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
  8. // Adapt to TransactionAspectSupport's invokeWithinTransaction...
  9. return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
  10. }

我们直接进入到

  1. return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);

方法中,真相应该就在这里

  1. @Nullable
  2. protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
  3. final InvocationCallback invocation) throws Throwable {
  4. // If the transaction attribute is null, the method is non-transactional.
  5. //这个是我们的那个advisor
  6. TransactionAttributeSource tas = getTransactionAttributeSource();
  7. final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
  8. //这个是我们之前指定的:DataSourceTransactionManager
  9. final TransactionManager tm = determineTransactionManager(txAttr);
  10. if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
  11. ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
  12. if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
  13. throw new TransactionUsageException(
  14. "Unsupported annotated transaction on suspending function detected: " + method +
  15. ". Use TransactionalOperator.transactional extensions instead.");
  16. }
  17. ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
  18. if (adapter == null) {
  19. throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
  20. method.getReturnType());
  21. }
  22. return new ReactiveTransactionSupport(adapter);
  23. });
  24. return txSupport.invokeWithinTransaction(
  25. method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
  26. }
  27. PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
  28. final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
  29. if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
  30. // Standard transaction demarcation with getTransaction and commit/rollback calls.
  31. //这里是创建事务,如果需要的话,也就是如果你在被代理的方法那里加了@Transation注解的话,这里就会创建一个事务
  32. TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
  33. Object retVal;
  34. try {
  35. // This is an around advice: Invoke the next interceptor in the chain.
  36. // This will normally result in a target object being invoked.
  37. retVal = invocation.proceedWithInvocation();
  38. }
  39. catch (Throwable ex) {
  40. // target invocation exception
  41. completeTransactionAfterThrowing(txInfo, ex);
  42. throw ex;
  43. }
  44. finally {
  45. cleanupTransactionInfo(txInfo);
  46. }
  47. if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
  48. // Set rollback-only in case of Vavr failure matching our rollback rules...
  49. TransactionStatus status = txInfo.getTransactionStatus();
  50. if (status != null && txAttr != null) {
  51. retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
  52. }
  53. }
  54. commitTransactionAfterReturning(txInfo);
  55. return retVal;
  56. }
  57. else {
  58. Object result;
  59. final ThrowableHolder throwableHolder = new ThrowableHolder();
  60. // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
  61. try {
  62. result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
  63. TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
  64. try {
  65. Object retVal = invocation.proceedWithInvocation();
  66. if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
  67. // Set rollback-only in case of Vavr failure matching our rollback rules...
  68. retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
  69. }
  70. return retVal;
  71. }
  72. catch (Throwable ex) {
  73. if (txAttr.rollbackOn(ex)) {
  74. // A RuntimeException: will lead to a rollback.
  75. if (ex instanceof RuntimeException) {
  76. throw (RuntimeException) ex;
  77. }
  78. else {
  79. throw new ThrowableHolderException(ex);
  80. }
  81. }
  82. else {
  83. // A normal return value: will lead to a commit.
  84. throwableHolder.throwable = ex;
  85. return null;
  86. }
  87. }
  88. finally {
  89. cleanupTransactionInfo(txInfo);
  90. }
  91. });
  92. }
  93. catch (ThrowableHolderException ex) {
  94. throw ex.getCause();
  95. }
  96. catch (TransactionSystemException ex2) {
  97. if (throwableHolder.throwable != null) {
  98. logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
  99. ex2.initApplicationException(throwableHolder.throwable);
  100. }
  101. throw ex2;
  102. }
  103. catch (Throwable ex2) {
  104. if (throwableHolder.throwable != null) {
  105. logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
  106. }
  107. throw ex2;
  108. }
  109. // Check result state: It might indicate a Throwable to rethrow.
  110. if (throwableHolder.throwable != null) {
  111. throw throwableHolder.throwable;
  112. }
  113. return result;
  114. }
  115. }

我们先去看看创建事务的逻辑

  1. @SuppressWarnings("serial")
  2. protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
  3. @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
  4. // If no name specified, apply method identification as transaction name.
  5. if (txAttr != null && txAttr.getName() == null) {
  6. txAttr = new DelegatingTransactionAttribute(txAttr) {
  7. @Override
  8. public String getName() {
  9. return joinpointIdentification;
  10. }
  11. };
  12. }
  13. TransactionStatus status = null;
  14. if (txAttr != null) {
  15. if (tm != null) {
  16. status = tm.getTransaction(txAttr);
  17. }
  18. else {
  19. if (logger.isDebugEnabled()) {
  20. logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
  21. "] because no transaction manager has been configured");
  22. }
  23. }
  24. }
  25. return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
  26. }

继续进入tm.getTransaction(txAttr);

  1. @Override
  2. public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
  3. throws TransactionException {
  4. // Use defaults if no transaction definition given.
  5. //这里默认的是PROPAGATION_REQUIRED,ISOLATION_DEFAULT
  6. TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
  7. Object transaction = doGetTransaction();
  8. boolean debugEnabled = logger.isDebugEnabled();
  9. if (isExistingTransaction(transaction)) {
  10. // Existing transaction found -> check propagation behavior to find out how to behave.
  11. return handleExistingTransaction(def, transaction, debugEnabled);
  12. }
  13. // Check definition settings for new transaction.
  14. if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
  15. throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
  16. }
  17. // No existing transaction found -> check propagation behavior to find out how to proceed.
  18. if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
  19. throw new IllegalTransactionStateException(
  20. "No existing transaction found for transaction marked with propagation 'mandatory'");
  21. }
  22. else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
  23. def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
  24. def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
  25. SuspendedResourcesHolder suspendedResources = suspend(null);
  26. if (debugEnabled) {
  27. logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
  28. }
  29. try {
  30. //这里会开启一个事务
  31. return startTransaction(def, transaction, debugEnabled, suspendedResources);
  32. }
  33. catch (RuntimeException | Error ex) {
  34. resume(null, suspendedResources);
  35. throw ex;
  36. }
  37. }
  38. else {
  39. // Create "empty" transaction: no actual transaction, but potentially synchronization.
  40. if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
  41. logger.warn("Custom isolation level specified but no actual transaction initiated; " +
  42. "isolation level will effectively be ignored: " + def);
  43. }
  44. boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
  45. return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
  46. }
  47. }

我们具体看看怎么开启事务

  1. /**
  2. * Start a new transaction.
  3. */
  4. private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
  5. boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
  6. boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
  7. DefaultTransactionStatus status = newTransactionStatus(
  8. definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
  9. doBegin(transaction, definition);
  10. prepareSynchronization(status, definition);
  11. return status;
  12. }

当然是doBegin方法啦

  1. @Override
  2. protected void doBegin(Object transaction, TransactionDefinition definition) {
  3. DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
  4. Connection con = null;
  5. try {
  6. if (!txObject.hasConnectionHolder() ||
  7. txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
  8. Connection newCon = obtainDataSource().getConnection();
  9. if (logger.isDebugEnabled()) {
  10. logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
  11. }
  12. txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
  13. }
  14. txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
  15. //这里获取一个数据库连接
  16. con = txObject.getConnectionHolder().getConnection();
  17. Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
  18. txObject.setPreviousIsolationLevel(previousIsolationLevel);
  19. txObject.setReadOnly(definition.isReadOnly());
  20. // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
  21. // so we don't want to do it unnecessarily (for example if we've explicitly
  22. // configured the connection pool to set it already).
  23. if (con.getAutoCommit()) {
  24. txObject.setMustRestoreAutoCommit(true);
  25. if (logger.isDebugEnabled()) {
  26. logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
  27. }
  28. //关闭自动提交:这里就开启了事务,事务的提交由我们自己控制
  29. con.setAutoCommit(false);
  30. }
  31. prepareTransactionalConnection(con, definition);
  32. txObject.getConnectionHolder().setTransactionActive(true);
  33. int timeout = determineTimeout(definition);
  34. if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
  35. txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
  36. }
  37. // Bind the connection holder to the thread.
  38. if (txObject.isNewConnectionHolder()) {
  39. TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
  40. }
  41. }
  42. catch (Throwable ex) {
  43. if (txObject.isNewConnectionHolder()) {
  44. DataSourceUtils.releaseConnection(con, obtainDataSource());
  45. txObject.setConnectionHolder(null, false);
  46. }
  47. throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
  48. }
  49. }

找到了我们的关键性代码

  1. con.setAutoCommit(false);

关闭了自动提交。好了这里如果需要事务的话就开启,我们这里开启了再回到我们之前的代码

  1. if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
  2. // Standard transaction demarcation with getTransaction and commit/rollback calls.
  3. TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
  4. Object retVal;
  5. try {
  6. // This is an around advice: Invoke the next interceptor in the chain.
  7. // This will normally result in a target object being invoked.
  8. //这里会先去执行我们的业务逻辑方法
  9. retVal = invocation.proceedWithInvocation();
  10. }
  11. catch (Throwable ex) {
  12. //如果有异常,则会到这里去回滚
  13. // target invocation exception
  14. completeTransactionAfterThrowing(txInfo, ex);
  15. throw ex;
  16. }
  17. finally {
  18. cleanupTransactionInfo(txInfo);
  19. }
  20. if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
  21. // Set rollback-only in case of Vavr failure matching our rollback rules...
  22. TransactionStatus status = txInfo.getTransactionStatus();
  23. if (status != null && txAttr != null) {
  24. retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
  25. }
  26. }
  27. commitTransactionAfterReturning(txInfo);
  28. return retVal;
  29. }

开启事务后,这里会先执行业务逻辑方法

  1. retVal = invocation.proceedWithInvocation();

这行代码会回到我们再AOP里面熟悉的

  1. if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
  2. return invokeJoinpoint();
  3. }

而 invokeJoinpoint()会执行具体的业务逻辑。

如果有抛出异常,这里会被catch方法捕获到

我们看下回滚的逻辑

  1. protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
  2. if (txInfo != null && txInfo.getTransactionStatus() != null) {
  3. if (logger.isTraceEnabled()) {
  4. logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
  5. "] after exception: " + ex);
  6. }
  7. //这里需要检查是否需要回滚
  8. if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
  9. try {
  10. txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
  11. }
  12. catch (TransactionSystemException ex2) {
  13. logger.error("Application exception overridden by rollback exception", ex);
  14. ex2.initApplicationException(ex);
  15. throw ex2;
  16. }
  17. catch (RuntimeException | Error ex2) {
  18. logger.error("Application exception overridden by rollback exception", ex);
  19. throw ex2;
  20. }
  21. }
  22. else {
  23. // We don't roll back on this exception.
  24. // Will still roll back if TransactionStatus.isRollbackOnly() is true.
  25. try {
  26. //如果不是运行期异常,我们还是会回滚的
  27. txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
  28. }
  29. catch (TransactionSystemException ex2) {
  30. logger.error("Application exception overridden by commit exception", ex);
  31. ex2.initApplicationException(ex);
  32. throw ex2;
  33. }
  34. catch (RuntimeException | Error ex2) {
  35. logger.error("Application exception overridden by commit exception", ex);
  36. throw ex2;
  37. }
  38. }
  39. }
  40. }

我们先判断了一下这个异常是否是

  1. @Override
  2. public boolean rollbackOn(Throwable ex) {
  3. return (ex instanceof RuntimeException || ex instanceof Error);
  4. }

RuntimeException,如果是运行期异常,如果是才会回滚,否则就直接提交。

如果没有抛出异常就会执行到

  1. commitTransactionAfterReturning(txInfo);

很明显就是提交事务啦

  1. /**
  2. * Execute after successful completion of call, but not after an exception was handled.
  3. * Do nothing if we didn't create a transaction.
  4. * @param txInfo information about the current transaction
  5. */
  6. protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
  7. if (txInfo != null && txInfo.getTransactionStatus() != null) {
  8. if (logger.isTraceEnabled()) {
  9. logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
  10. }
  11. txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
  12. }
  13. }

好了!其实总结一句,看AOP还是事务,还是别的什么组件,主要是看Advisor里面的advice。

 261

啊!这个可能是世界上最丑的留言输入框功能~


当然,也是最丑的留言列表

有疑问发邮件到 : suibibk@qq.com 侵权立删
Copyright : 个人随笔   备案号 : 粤ICP备18099399号-2