Commit efea34c87409a5e8e289e83bc46276e7d9793325

Authored by 徐烜
1 parent 799dc67f

添加异步commit,cancel支持

Showing 14 changed files with 196 additions and 25 deletions
tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/TransactionManager.java
... ... @@ -10,6 +10,8 @@ import org.mengyun.tcctransaction.repository.TransactionRepository;
10 10 import org.slf4j.Logger;
11 11 import org.slf4j.LoggerFactory;
12 12  
  13 +import java.util.concurrent.ExecutorService;
  14 +
13 15 /**
14 16 * Created by changmingxie on 10/26/15.
15 17 * 事务管理器:开始事务,提交事务,回滚事务。
... ... @@ -26,6 +28,12 @@ public class TransactionManager {
26 28 this.transactionRepository = transactionRepository;
27 29 }
28 30  
  31 + /** 异步执行service */
  32 + private ExecutorService executorService;
  33 + public void setExecutorService(ExecutorService executorService) {
  34 + this.executorService = executorService;
  35 + }
  36 +
29 37 /** 事务线程局部变量 */
30 38 private ThreadLocal<Transaction> threadLocalTransaction = new ThreadLocal<Transaction>();
31 39 public Transaction getCurrentTransaction() {
... ... @@ -47,14 +55,36 @@ public class TransactionManager {
47 55 /**
48 56 * 提交事务.
49 57 */
50   - public void commit() {
  58 + public void commit(boolean asyncCommit) {
51 59 LOG.debug("==>TransactionManager commit()");
52   - Transaction transaction = getCurrentTransaction();
  60 + final Transaction transaction = getCurrentTransaction();
53 61  
54 62 transaction.changeStatus(TransactionStatus.CONFIRMING);
55 63 LOG.debug("==>TransactionManager update transaction status to CONFIRMING");
56 64 transactionRepository.update(transaction);
57 65  
  66 + if (asyncCommit) {
  67 + try {
  68 + Long statTime = System.currentTimeMillis();
  69 + // TODO:后面会对submit返回的Future做处理
  70 + executorService.submit(new Runnable() {
  71 + @Override
  72 + public void run() {
  73 + commitTransaction(transaction);
  74 + }
  75 + });
  76 + LOG.debug("async submit cost time:" + (System.currentTimeMillis() - statTime));
  77 + } catch (Throwable exp) {
  78 + LOG.warn("compensable transaction async submit confirm failed, recovery job will try to confirm later.", exp);
  79 + throw new ConfirmingException(exp);
  80 + }
  81 + } else {
  82 + commitTransaction(transaction);
  83 + }
  84 +
  85 +
  86 + }
  87 + private void commitTransaction(Transaction transaction) {
58 88 try {
59 89 LOG.info("==>TransactionManager transaction begin commit()");
60 90 transaction.commit();
... ... @@ -68,13 +98,31 @@ public class TransactionManager {
68 98 /**
69 99 * 回滚事务.
70 100 */
71   - public void rollback() {
  101 + public void rollback(boolean asyncRollback) {
72 102 LOG.debug("==>TransactionManager rollback()");
73 103  
74   - Transaction transaction = getCurrentTransaction();
  104 + final Transaction transaction = getCurrentTransaction();
75 105 transaction.changeStatus(TransactionStatus.CANCELLING);
76 106 transactionRepository.update(transaction);
77 107  
  108 + if (asyncRollback) {
  109 + try {
  110 + executorService.submit(new Runnable() {
  111 + @Override
  112 + public void run() {
  113 + rollbackTransaction(transaction);
  114 + }
  115 + });
  116 + } catch (Throwable exp) {
  117 + LOG.warn("compensable transaction async rollback failed, recovery job will try to rollback later.", exp);
  118 + throw new CancellingException(exp);
  119 + }
  120 + } else {
  121 + rollbackTransaction(transaction);
  122 + }
  123 + }
  124 +
  125 + private void rollbackTransaction(Transaction transaction) {
78 126 try {
79 127 LOG.info("==>TransactionManager transaction begin rollback()");
80 128 transaction.rollback();
... ...
tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/interceptor/CompensableTransactionInterceptor.java
... ... @@ -54,20 +54,22 @@ public class CompensableTransactionInterceptor {
54 54  
55 55 /**
56 56 * 主事务方法的处理.
57   - * @param compensableMethodContext
  57 + * @param methodContext
58 58 * @throws Throwable
59 59 */
60   - private Object rootMethodProceed(MethodContext compensableMethodContext) throws Throwable {
  60 + private Object rootMethodProceed(MethodContext methodContext) throws Throwable {
61 61 LOG.debug("==>rootMethodProceed");
  62 + boolean asyncConfirm = methodContext.isAsyncConfirm();
  63 + boolean asyncCancel = methodContext.isAsyncCancel();
62 64  
63 65 TransactionManager transactionManager = transactionConfigurator.getTransactionManager();
64 66 transactionManager.begin(); // 事务开始(创建事务日志记录,并在当前线程缓存该事务日志记录)
65 67  
66   - Object returnValue = null; // 返回值
  68 + Object returnValue; // 返回值
67 69 try {
68 70  
69 71 LOG.debug("==>rootMethodProceed try begin");
70   - returnValue = compensableMethodContext.proceed(); // Try (开始执行被拦截的方法)
  72 + returnValue = methodContext.proceed(); // Try (开始执行被拦截的方法)
71 73 LOG.debug("==>rootMethodProceed try end");
72 74  
73 75 } catch (OptimisticLockException e) {
... ... @@ -75,25 +77,28 @@ public class CompensableTransactionInterceptor {
75 77 throw e; //do not rollback, waiting for recovery job
76 78 } catch (Throwable tryingException) {
77 79 LOG.warn("compensable transaction trying failed.", tryingException);
78   - transactionManager.rollback();
  80 + transactionManager.rollback(asyncCancel);
79 81 throw tryingException;
80 82 }
81 83  
82 84 LOG.info("===>rootMethodProceed begin commit()");
83   - transactionManager.commit(); // Try检验正常后提交(事务管理器在控制提交)
  85 + transactionManager.commit(asyncConfirm); // Try检验正常后提交(事务管理器在控制提交)
84 86  
85 87 return returnValue;
86 88 }
87 89  
88 90 /**
89 91 * 服务提供者事务方法处理.
90   - * @param compensableMethodContext
  92 + * @param methodContext
91 93 * @throws Throwable
92 94 */
93   - private Object providerMethodProceed(MethodContext compensableMethodContext) throws Throwable {
94   - TransactionContext transactionContext = compensableMethodContext.getTransactionContext();
  95 + private Object providerMethodProceed(MethodContext methodContext) throws Throwable {
  96 + TransactionContext transactionContext = methodContext.getTransactionContext();
  97 + boolean asyncConfirm = methodContext.isAsyncConfirm();
  98 + boolean asyncCancel = methodContext.isAsyncCancel();
95 99  
96   - LOG.debug("==>providerMethodProceed transactionStatus:{}", TransactionStatus.valueOf(transactionContext.getStatus()).toString());
  100 + LOG.debug("==>providerMethodProceed transactionStatus:{}",
  101 + TransactionStatus.valueOf(transactionContext.getStatus()).toString());
97 102  
98 103 switch (TransactionStatus.valueOf(transactionContext.getStatus())) {
99 104 case TRYING:
... ... @@ -101,15 +106,15 @@ public class CompensableTransactionInterceptor {
101 106 // 基于全局事务ID扩展创建新的分支事务,并存于当前线程的事务局部变量中.
102 107 transactionConfigurator.getTransactionManager().propagationNewBegin(transactionContext);
103 108 LOG.debug("==>providerMethodProceed try end");
104   - return compensableMethodContext.proceed();
  109 + return methodContext.proceed();
105 110 case CONFIRMING:
106 111 try {
107 112 LOG.debug("==>providerMethodProceed confirm begin");
108 113 // 找出存在的事务并处理.
109 114 transactionConfigurator.getTransactionManager().propagationExistBegin(transactionContext);
110   - transactionConfigurator.getTransactionManager().commit(); // 提交
  115 + transactionConfigurator.getTransactionManager().commit(asyncConfirm); // 提交
111 116 LOG.debug("==>providerMethodProceed confirm end");
112   - } catch (NoExistedTransactionException excepton) {
  117 + } catch (NoExistedTransactionException exception) {
113 118 //the transaction has been commit,ignore it.
114 119 }
115 120 break;
... ... @@ -117,7 +122,7 @@ public class CompensableTransactionInterceptor {
117 122 try {
118 123 LOG.debug("==>providerMethodProceed cancel begin");
119 124 transactionConfigurator.getTransactionManager().propagationExistBegin(transactionContext);
120   - transactionConfigurator.getTransactionManager().rollback(); // 回滚
  125 + transactionConfigurator.getTransactionManager().rollback(asyncCancel); // 回滚
121 126 LOG.debug("==>providerMethodProceed cancel end");
122 127 } catch (NoExistedTransactionException exception) {
123 128 //the transaction has been rollback,ignore it.
... ... @@ -125,7 +130,7 @@ public class CompensableTransactionInterceptor {
125 130 break;
126 131 }
127 132  
128   - Method method = compensableMethodContext.getMethod();
  133 + Method method = methodContext.getMethod();
129 134 return ReflectionUtils.getNullValue(method.getReturnType());
130 135 }
131 136  
... ...
tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/interceptor/ResourceCoordinatorInterceptor.java
... ... @@ -67,7 +67,7 @@ public class ResourceCoordinatorInterceptor {
67 67 }
68 68  
69 69 LOG.debug("==>pjp.proceed(pjp.getArgs())");
70   - return pjp.proceed(pjp.getArgs());
  70 + return methodContext.proceedWithInArgs();
71 71 }
72 72  
73 73 /**
... ...
tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/interceptor/invoke/MethodContext.java
... ... @@ -118,6 +118,14 @@ public class MethodContext {
118 118 }
119 119  
120 120 /**
  121 + * 是否异步confirm。
  122 + * @return
  123 + */
  124 + public boolean isAsyncConfirm() {
  125 + return this.compensable.asyncConfirm();
  126 + }
  127 +
  128 + /**
121 129 * 获取cancel方法名。
122 130 * @return
123 131 */
... ... @@ -126,7 +134,15 @@ public class MethodContext {
126 134 }
127 135  
128 136 /**
129   - * 执行方法。
  137 + * 是否异步cancel。
  138 + * @return
  139 + */
  140 + public boolean isAsyncCancel() {
  141 + return this.compensable.asyncCancel();
  142 + }
  143 +
  144 + /**
  145 + * 执行方法(切入点定义的是标注)。
130 146 * @return
131 147 * @throws Throwable
132 148 */
... ... @@ -134,5 +150,14 @@ public class MethodContext {
134 150 return this.proceedingJoinPoint.proceed();
135 151 }
136 152  
  153 + /**
  154 + * 执行方法(切入点定义的是方法签名)
  155 + * @return
  156 + * @throws Throwable
  157 + */
  158 + public Object proceedWithInArgs() throws Throwable {
  159 + return this.proceedingJoinPoint.proceed(this.getProceedingJoinPoint().getArgs());
  160 + }
  161 +
137 162  
138 163 }
... ...
tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/recover/RecoverConfig.java
... ... @@ -22,4 +22,23 @@ public interface RecoverConfig {
22 22 * @return
23 23 */
24 24 String getCronExpression();
  25 +
  26 + /**
  27 + * 获取线程池中核心线程数的最大值.
  28 + * @return
  29 + */
  30 + int getAsyncTerminateThreadCorePoolSize();
  31 +
  32 + /**
  33 + * 获取线程池中能拥有最多线程数。
  34 + * @return
  35 + */
  36 + int getAsyncTerminateThreadMaxPoolSize();
  37 +
  38 + /**
  39 + * 获取用于缓存任务的阻塞队列。
  40 + * @return
  41 + */
  42 + int getAsyncTerminateThreadWorkQueueSize();
  43 +
25 44 }
... ...
tcc-transaction-core/target/classes/org/mengyun/tcctransaction/TransactionManager.class
No preview for this file type
tcc-transaction-core/target/classes/org/mengyun/tcctransaction/interceptor/CompensableTransactionInterceptor$1.class
No preview for this file type
tcc-transaction-core/target/classes/org/mengyun/tcctransaction/interceptor/CompensableTransactionInterceptor.class
No preview for this file type
tcc-transaction-core/target/classes/org/mengyun/tcctransaction/interceptor/ResourceCoordinatorInterceptor.class
No preview for this file type
tcc-transaction-core/target/classes/org/mengyun/tcctransaction/recover/RecoverConfig.class
No preview for this file type
tcc-transaction-spring/src/main/java/org/mengyun/tcctransaction/spring/recover/DefaultRecoverConfig.java
... ... @@ -21,6 +21,28 @@ public class DefaultRecoverConfig implements RecoverConfig {
21 21 /** 恢复Job触发间隔配置,默认是(每分钟) */
22 22 private String cronExpression = "0 */1 * * * ?";
23 23  
  24 + /** 线程池中核心线程数的最大值 */
  25 + private int asyncTerminateThreadCorePoolSize = 512;
  26 + /** 线程池中能拥有最多线程数 */
  27 + private int asyncTerminateThreadMaxPoolSize = 1024;
  28 + /** 用于缓存任务的阻塞队列 */
  29 + private int asyncTerminateThreadWorkQueueSize = 512;
  30 +
  31 + @Override
  32 + public int getAsyncTerminateThreadCorePoolSize() {
  33 + return asyncTerminateThreadCorePoolSize;
  34 + }
  35 +
  36 + @Override
  37 + public int getAsyncTerminateThreadMaxPoolSize() {
  38 + return asyncTerminateThreadMaxPoolSize;
  39 + }
  40 +
  41 + @Override
  42 + public int getAsyncTerminateThreadWorkQueueSize() {
  43 + return asyncTerminateThreadWorkQueueSize;
  44 + }
  45 +
24 46 @Override
25 47 public int getMaxRetryCount() {
26 48 return maxRetryCount;
... ...
tcc-transaction-spring/src/main/java/org/mengyun/tcctransaction/spring/support/TccTransactionConfigurator.java
... ... @@ -9,6 +9,8 @@ import org.mengyun.tcctransaction.support.TransactionConfigurator;
9 9 import org.springframework.beans.factory.annotation.Autowired;
10 10  
11 11 import javax.annotation.PostConstruct;
  12 +import java.util.concurrent.*;
  13 +import java.util.concurrent.atomic.AtomicInteger;
12 14  
13 15 /**
14 16 * Created by changmingxie on 11/11/15.
... ... @@ -32,15 +34,16 @@ public class TccTransactionConfigurator implements TransactionConfigurator {
32 34 return recoverConfig;
33 35 }
34 36  
35   - /**
36   - * 根据事务配置器创建事务管理器.
37   - */
  37 + /** 事务管理器 */
38 38 private TransactionManager transactionManager = new TransactionManager();
39 39 @Override
40 40 public TransactionManager getTransactionManager() {
41 41 return transactionManager;
42 42 }
43 43  
  44 + /** 线程池配置 */
  45 + private static volatile ExecutorService executorService;
  46 +
44 47 @PostConstruct
45 48 public void init() {
46 49 // 属性注入后调用
... ... @@ -52,10 +55,59 @@ public class TccTransactionConfigurator implements TransactionConfigurator {
52 55 recoverConfig.getRecoverDuration()
53 56 );
54 57 }
  58 +
55 59 // 2、事务配置器配置事务repo
56 60 transactionManager.setTransactionRepository(transactionRepository);
57 61  
58   - // TODO:3、executeService配置
  62 + // 3、线程池配置
  63 + if (executorService == null) {
  64 + executorService = new ThreadPoolExecutor(
  65 + recoverConfig.getAsyncTerminateThreadCorePoolSize(), // 核心线程数
  66 + recoverConfig.getAsyncTerminateThreadMaxPoolSize(), // 最大线程数
  67 + 5L, // 最大空闲时间
  68 + TimeUnit.SECONDS, // 时间单位(秒)
  69 + new ArrayBlockingQueue<Runnable>( // 缓存任务的阻塞队列
  70 + recoverConfig.getAsyncTerminateThreadWorkQueueSize()),
  71 + new ThreadFactory() { // 创建线程的工厂
  72 + final AtomicInteger poolNumber = new AtomicInteger(1);
  73 + final ThreadGroup group;
  74 + final AtomicInteger threadNumber = new AtomicInteger(1);
  75 + final String namePrefix;
  76 +
  77 + {
  78 + SecurityManager securityManager = System.getSecurityManager();
  79 + if (securityManager == null) {
  80 + this.group = Thread.currentThread().getThreadGroup();
  81 + } else {
  82 + this.group = securityManager.getThreadGroup();
  83 + }
  84 + this.namePrefix = "tcc-async-terminate-pool-" + poolNumber.getAndIncrement() + "-thread-";
  85 + }
  86 +
  87 + @Override
  88 + public Thread newThread(Runnable r) {
  89 + Thread thread = new Thread(
  90 + this.group,
  91 + r,
  92 + this.namePrefix + this.threadNumber.getAndIncrement(),
  93 + 0L
  94 + );
  95 + if (thread.isDaemon()) {
  96 + thread.setDaemon(false);
  97 + }
  98 +
  99 + if (thread.getPriority() != 5) {
  100 + thread.setPriority(5);
  101 + }
  102 +
  103 + return thread;
  104 + }
  105 + },
  106 + new ThreadPoolExecutor.CallerRunsPolicy() // 线程池满了且任务队列也满了,由向线程池提交任务的线程来执行该任务
  107 + );
  108 + }
  109 + transactionManager.setExecutorService(executorService);
  110 +
59 111 }
60 112  
61 113  
... ...
tcc-transaction-spring/target/classes/org/mengyun/tcctransaction/spring/recover/DefaultRecoverConfig.class
No preview for this file type
tcc-transaction-spring/target/classes/org/mengyun/tcctransaction/spring/support/TccTransactionConfigurator.class
No preview for this file type