Commit 80d5d9522ef7c160198e6b40988d21739e3e8efc

Authored by 徐烜
1 parent 7d9feadf

1、优化异步commit,cancel操作,在线程池调用中propagationExistBegin事务,具体看代码

2、添加新的事务恢复类TransactionRecovery_1_2
... ... @@ -31,6 +31,8 @@
31 31  
32 32 <toolkit-google-guava.version>19.0</toolkit-google-guava.version>
33 33 <toolkit-javac-janino.version>2.7.8</toolkit-javac-janino.version>
  34 + <toolkit-apache-commons-lang3.version>3.4</toolkit-apache-commons-lang3.version>
  35 + <toolkit-alibaba-fastjson.version>1.2.62</toolkit-alibaba-fastjson.version>
34 36  
35 37 <fullstack-springframework.version>4.2.5.RELEASE</fullstack-springframework.version>
36 38  
... ... @@ -60,6 +62,22 @@
60 62 </dependency>
61 63 <!-- google相关 end -->
62 64  
  65 + <!-- apache-common相关 begin -->
  66 + <dependency>
  67 + <groupId>org.apache.commons</groupId>
  68 + <artifactId>commons-lang3</artifactId>
  69 + <version>${toolkit-apache-commons-lang3.version}</version>
  70 + </dependency>
  71 + <!-- apache-common相关 end -->
  72 +
  73 + <!-- json相关 begin -->
  74 + <dependency>
  75 + <groupId>com.alibaba</groupId>
  76 + <artifactId>fastjson</artifactId>
  77 + <version>${toolkit-alibaba-fastjson.version}</version>
  78 + </dependency>
  79 + <!-- json相关 end -->
  80 +
63 81 <!-- 日志处理相关 begin -->
64 82 <dependency>
65 83 <groupId>org.slf4j</groupId>
... ...
tcc-transaction-core/pom.xml
... ... @@ -24,6 +24,20 @@
24 24 </dependency>
25 25 <!-- google相关 end -->
26 26  
  27 + <!-- apache commons相关 begin -->
  28 + <dependency>
  29 + <groupId>org.apache.commons</groupId>
  30 + <artifactId>commons-lang3</artifactId>
  31 + </dependency>
  32 + <!-- apache commons相关 end -->
  33 +
  34 + <!-- json相关 begin -->
  35 + <dependency>
  36 + <groupId>com.alibaba</groupId>
  37 + <artifactId>fastjson</artifactId>
  38 + </dependency>
  39 + <!-- json相关 end -->
  40 +
27 41 <!-- 日志处理相关 begin -->
28 42 <dependency>
29 43 <groupId>org.slf4j</groupId>
... ...
tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/TransactionManager.java
... ... @@ -2,6 +2,7 @@ package org.mengyun.tcctransaction;
2 2  
3 3 import org.mengyun.tcctransaction.api.TransactionContext;
4 4 import org.mengyun.tcctransaction.api.TransactionStatus;
  5 +import org.mengyun.tcctransaction.api.TransactionXid;
5 6 import org.mengyun.tcctransaction.api.UuidUtils;
6 7 import org.mengyun.tcctransaction.exception.CancellingException;
7 8 import org.mengyun.tcctransaction.exception.ConfirmingException;
... ... @@ -70,7 +71,22 @@ public class TransactionManager {
70 71 executorService.submit(new Runnable() {
71 72 @Override
72 73 public void run() {
73   - commitTransaction(transaction);
  74 + // 注意:不同线程必须这样做,否则在业务方法中无法获取Transaction
  75 + // 传播需要恢复的事务上下文
  76 + try {
  77 + TransactionContext transactionContext = new TransactionContext(
  78 + new TransactionXid(
  79 + transaction.getXid().getGlobalTransactionId(),
  80 + transaction.getXid().getBranchQualifier()),
  81 + transaction.getStatus().getId()
  82 + );
  83 + propagationExistBegin(transactionContext);
  84 +
  85 + commitTransaction(transaction);
  86 + } catch (NoExistedTransactionException exp) {
  87 + LOG.error("async propagation transaction error!", exp);
  88 + }
  89 +
74 90 }
75 91 });
76 92 LOG.debug("async submit cost time:" + (System.currentTimeMillis() - statTime));
... ... @@ -110,7 +126,22 @@ public class TransactionManager {
110 126 executorService.submit(new Runnable() {
111 127 @Override
112 128 public void run() {
113   - rollbackTransaction(transaction);
  129 + // 注意:不同线程必须这样做,否则在业务方法中无法获取Transaction
  130 + // 传播需要恢复的事务上下文
  131 + try {
  132 + TransactionContext transactionContext = new TransactionContext(
  133 + new TransactionXid(
  134 + transaction.getXid().getGlobalTransactionId(),
  135 + transaction.getXid().getBranchQualifier()),
  136 + transaction.getStatus().getId()
  137 + );
  138 + propagationExistBegin(transactionContext);
  139 +
  140 + rollbackTransaction(transaction);
  141 + } catch (NoExistedTransactionException exp) {
  142 + LOG.error("async propagation transaction error!", exp);
  143 + }
  144 +
114 145 }
115 146 });
116 147 } catch (Throwable exp) {
... ...
tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/recover/TransactionRecovery.java
1 1 package org.mengyun.tcctransaction.recover;
2 2  
3   -import org.mengyun.tcctransaction.Transaction;
4   -import org.mengyun.tcctransaction.TransactionType;
5   -import org.mengyun.tcctransaction.api.TransactionContext;
6   -import org.mengyun.tcctransaction.api.TransactionStatus;
7   -import org.mengyun.tcctransaction.api.TransactionXid;
8   -import org.mengyun.tcctransaction.repository.TransactionRepository;
9   -import org.mengyun.tcctransaction.support.TransactionConfigurator;
10   -import org.slf4j.Logger;
11   -import org.slf4j.LoggerFactory;
12   -
13   -import java.util.ArrayList;
14   -import java.util.Calendar;
15   -import java.util.Date;
16   -import java.util.List;
17   -
18 3 /**
19   - * Created by changmingxie on 11/10/15.
20   - * 事务恢复.
  4 + * 事务恢复接口。
21 5 */
22   -public class TransactionRecovery {
23   - /** 日志记录器 */
24   - private static final Logger LOG = LoggerFactory.getLogger(TransactionRecovery.class);
25   -
26   - /** TCC事务配置器 */
27   - private TransactionConfigurator transactionConfigurator;
28   - public void setTransactionConfigurator(TransactionConfigurator transactionConfigurator) {
29   - this.transactionConfigurator = transactionConfigurator;
30   - }
31   -
  6 +public interface TransactionRecovery {
32 7 /**
33 8 * 启动事务恢复操作(被RecoverScheduledJob定时任务调用).
34 9 */
35   - public void startRecover() {
36   - List<Transaction> transactions = loadErrorTransactions(); // 找出所有执行错误的事务信息
37   - recoverErrorTransactions(transactions);
38   - }
39   -
40   - /**
41   - * 找出所有执行错误的事务信息
42   - * @return
43   - */
44   - private List<Transaction> loadErrorTransactions() {
45   -
46   - TransactionRepository transactionRepository = transactionConfigurator.getTransactionRepository();
47   -
48   - long currentTimeInMillis = Calendar.getInstance().getTimeInMillis();
49   -
50   - // 找出相关时间内的异常ROOT类型的事务
51   - List<Transaction> transactions = transactionRepository.findAllUnmodifiedSince(
52   - new Date(currentTimeInMillis - transactionConfigurator.getRecoverConfig().getRecoverDuration() * 1000),
53   - TransactionType.ROOT);
54   -
55   - List<Transaction> recoverTransactions = new ArrayList<>();
56   -
57   - for (Transaction transaction : transactions) {
58   - // 检验记录是否已经被修改(版本校验)
59   - int result = transactionRepository.update(transaction);
60   -
61   - if (result > 0) {
62   - recoverTransactions.add(transaction);
63   - }
64   - }
65   -
66   - // 日志输出,调试用
67   - if (!transactions.isEmpty()){
68   - LOG.debug("==>TransactionRecovery loadErrorTransactions transactions size:" + transactions.size());
69   - }
70   -
71   - return recoverTransactions;
72   - }
73   -
74   -
75   - /**
76   - * 恢复错误的事务.
77   - * @param transactions
78   - */
79   - private void recoverErrorTransactions(List<Transaction> transactions) {
80   -
81   - for (Transaction transaction : transactions) {
82   -
83   - if (transaction.getRetriedCount() > transactionConfigurator.getRecoverConfig().getMaxRetryCount()) {
84   - // 超过次数的,跳过
85   - LOG.error(String.format(
86   - "TransactionRecovery recover failed with max retry count,will not try again. txid:%s, status:%s,retried count:%d",
87   - transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount()));
88   - continue;
89   - }
90   -
91   - try {
92   - transaction.addRetriedCount(); // 重试次数+1
93   -
94   - // 传播需要恢复的事务上下文
95   - TransactionContext transactionContext = new TransactionContext(
96   - new TransactionXid(
97   - transaction.getXid().getGlobalTransactionId(),
98   - transaction.getXid().getBranchQualifier()),
99   - transaction.getStatus().getId()
100   - );
101   - transactionConfigurator.getTransactionManager().propagationExistBegin(transactionContext);
102   -
103   - if (transaction.getStatus().equals(TransactionStatus.CONFIRMING)) {
104   - // 如果是CONFIRMING(2)状态,则将事务往前执行
105   - transaction.changeStatus(TransactionStatus.CONFIRMING);
106   - transactionConfigurator.getTransactionRepository().update(transaction);
107   - transaction.commit();
108   -
109   - } else {
110   - // 其他情况,把事务状态改为CANCELLING(3),然后执行回滚
111   - transaction.changeStatus(TransactionStatus.CANCELLING);
112   - transactionConfigurator.getTransactionRepository().update(transaction);
113   - transaction.rollback();
114   - }
115   -
116   - // 其他情况下,超时没处理的事务日志直接删除
117   - transactionConfigurator.getTransactionRepository().delete(transaction);
118   - } catch (Throwable e) {
119   - LOG.warn(String.format(
120   - "TransactionRecovery recover failed, txid:%s, status:%s,retried count:%d",
121   - transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount()), e);
122   - }
123   - }
124   - }
  10 + void startRecover();
125 11 }
... ...
tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/recover/TransactionRecovery_1_1.java 0 → 100644
  1 +package org.mengyun.tcctransaction.recover;
  2 +
  3 +import org.mengyun.tcctransaction.Transaction;
  4 +import org.mengyun.tcctransaction.TransactionType;
  5 +import org.mengyun.tcctransaction.api.TransactionContext;
  6 +import org.mengyun.tcctransaction.api.TransactionStatus;
  7 +import org.mengyun.tcctransaction.api.TransactionXid;
  8 +import org.mengyun.tcctransaction.repository.TransactionRepository;
  9 +import org.mengyun.tcctransaction.support.TransactionConfigurator;
  10 +import org.slf4j.Logger;
  11 +import org.slf4j.LoggerFactory;
  12 +
  13 +import java.util.ArrayList;
  14 +import java.util.Calendar;
  15 +import java.util.Date;
  16 +import java.util.List;
  17 +
  18 +/**
  19 + * Created by changmingxie on 11/10/15.
  20 + * 事务恢复.
  21 + */
  22 +public class TransactionRecovery_1_1 implements TransactionRecovery {
  23 + /** 日志记录器 */
  24 + private static final Logger LOG = LoggerFactory.getLogger(TransactionRecovery_1_1.class);
  25 +
  26 + /** TCC事务配置器 */
  27 + private TransactionConfigurator transactionConfigurator;
  28 + public void setTransactionConfigurator(TransactionConfigurator transactionConfigurator) {
  29 + this.transactionConfigurator = transactionConfigurator;
  30 + }
  31 +
  32 + @Override
  33 + public void startRecover() {
  34 + List<Transaction> transactions = loadErrorTransactions(); // 找出所有执行错误的事务信息
  35 + recoverErrorTransactions(transactions);
  36 + }
  37 +
  38 + /**
  39 + * 找出所有执行错误的事务信息(时间范围和事务类型)。
  40 + * @return
  41 + */
  42 + private List<Transaction> loadErrorTransactions() {
  43 +
  44 + TransactionRepository transactionRepository = transactionConfigurator.getTransactionRepository();
  45 +
  46 + long currentTimeInMillis = Calendar.getInstance().getTimeInMillis();
  47 +
  48 + // 找出相关时间内的异常ROOT类型的事务
  49 + List<Transaction> transactions = transactionRepository.findAllUnmodifiedSince(
  50 + new Date(currentTimeInMillis - transactionConfigurator.getRecoverConfig().getRecoverDuration() * 1000),
  51 + TransactionType.ROOT);
  52 +
  53 + List<Transaction> recoverTransactions = new ArrayList<>();
  54 +
  55 + for (Transaction transaction : transactions) {
  56 + // 检验记录是否已经被修改(版本校验)
  57 + int result = transactionRepository.update(transaction);
  58 +
  59 + if (result > 0) {
  60 + recoverTransactions.add(transaction);
  61 + }
  62 + }
  63 +
  64 + // 日志输出,调试用
  65 + if (!transactions.isEmpty()){
  66 + LOG.debug("==>TransactionRecovery loadErrorTransactions transactions size:" + transactions.size());
  67 + }
  68 +
  69 + return recoverTransactions;
  70 + }
  71 +
  72 +
  73 + /**
  74 + * 恢复错误的事务.
  75 + * @param transactions
  76 + */
  77 + private void recoverErrorTransactions(List<Transaction> transactions) {
  78 +
  79 + for (Transaction transaction : transactions) {
  80 +
  81 + if (transaction.getRetriedCount() > transactionConfigurator.getRecoverConfig().getMaxRetryCount()) {
  82 + // 超过最大恢复次数的,跳过
  83 + LOG.error(String.format(
  84 + "TransactionRecovery recover failed with max retry count,will not try again. txid:%s, status:%s,retried count:%d",
  85 + transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount()));
  86 + continue;
  87 + }
  88 +
  89 + try {
  90 + transaction.addRetriedCount(); // 重试次数+1
  91 +
  92 + // 传播需要恢复的事务上下文
  93 + TransactionContext transactionContext = new TransactionContext(
  94 + new TransactionXid(
  95 + transaction.getXid().getGlobalTransactionId(),
  96 + transaction.getXid().getBranchQualifier()),
  97 + transaction.getStatus().getId()
  98 + );
  99 + transactionConfigurator.getTransactionManager().propagationExistBegin(transactionContext);
  100 +
  101 + if (transaction.getStatus().equals(TransactionStatus.CONFIRMING)) {
  102 + // 如果是CONFIRMING(2)状态,则将事务往前执行
  103 + transaction.changeStatus(TransactionStatus.CONFIRMING);
  104 + transactionConfigurator.getTransactionRepository().update(transaction);
  105 + transaction.commit();
  106 +
  107 + } else {
  108 + // 其他情况,把事务状态改为CANCELLING(3),然后执行回滚
  109 + transaction.changeStatus(TransactionStatus.CANCELLING);
  110 + transactionConfigurator.getTransactionRepository().update(transaction);
  111 + transaction.rollback();
  112 + }
  113 +
  114 + // 其他情况下,超时没处理的事务日志直接删除
  115 + transactionConfigurator.getTransactionRepository().delete(transaction);
  116 + } catch (Throwable e) {
  117 + LOG.warn(String.format(
  118 + "TransactionRecovery recover failed, txid:%s, status:%s,retried count:%d",
  119 + transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount()), e);
  120 + }
  121 + }
  122 + }
  123 +}
... ...
tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/recover/TransactionRecovery_1_2.java 0 → 100644
  1 +package org.mengyun.tcctransaction.recover;
  2 +
  3 +import com.alibaba.fastjson.JSON;
  4 +import org.apache.commons.lang3.exception.ExceptionUtils;
  5 +import org.mengyun.tcctransaction.Transaction;
  6 +import org.mengyun.tcctransaction.TransactionType;
  7 +import org.mengyun.tcctransaction.api.TransactionContext;
  8 +import org.mengyun.tcctransaction.api.TransactionStatus;
  9 +import org.mengyun.tcctransaction.api.TransactionXid;
  10 +import org.mengyun.tcctransaction.exception.OptimisticLockException;
  11 +import org.mengyun.tcctransaction.repository.TransactionRepository;
  12 +import org.mengyun.tcctransaction.support.TransactionConfigurator;
  13 +import org.slf4j.Logger;
  14 +import org.slf4j.LoggerFactory;
  15 +
  16 +import java.util.Calendar;
  17 +import java.util.Date;
  18 +import java.util.List;
  19 +
  20 +/**
  21 + * 事务恢复1.2版本。
  22 + */
  23 +public class TransactionRecovery_1_2 implements TransactionRecovery {
  24 + /** 日志记录器 */
  25 + private static final Logger LOG = LoggerFactory.getLogger(TransactionRecovery_1_2.class);
  26 +
  27 + /** TCC事务配置器 */
  28 + private TransactionConfigurator transactionConfigurator;
  29 + public void setTransactionConfigurator(TransactionConfigurator transactionConfigurator) {
  30 + this.transactionConfigurator = transactionConfigurator;
  31 + }
  32 +
  33 + @Override
  34 + public void startRecover() {
  35 + List<Transaction> transactions = loadErrorTransactions(); // 找出所有执行错误的事务信息
  36 + recoverErrorTransactions(transactions);
  37 + }
  38 +
  39 + /**
  40 + * 找出所有执行错误的事务信息(时间范围)。
  41 + * @return
  42 + */
  43 + private List<Transaction> loadErrorTransactions() {
  44 +
  45 + long currentTimeInMillis = Calendar.getInstance().getTimeInMillis();
  46 +
  47 + TransactionRepository transactionRepository = transactionConfigurator.getTransactionRepository();
  48 + RecoverConfig recoverConfig = transactionConfigurator.getRecoverConfig();
  49 +
  50 + return transactionRepository.findAllUnmodifiedSince(
  51 + new Date(currentTimeInMillis - recoverConfig.getRecoverDuration() * 1000));
  52 + }
  53 +
  54 + /**
  55 + * 恢复错误的事务.
  56 + * @param transactions
  57 + */
  58 + private void recoverErrorTransactions(List<Transaction> transactions) {
  59 + LOG.debug("problem transactions size:{}", transactions.size());
  60 +
  61 + for (Transaction transaction : transactions) {
  62 +
  63 + if (transaction.getRetriedCount() > transactionConfigurator.getRecoverConfig().getMaxRetryCount()) {
  64 + // 超过最大恢复次数的,跳过(需要人工介入处理)
  65 + // 不区分ROOT/BRANCH事务
  66 + LOG.error("recover failed with max retry count, will not try again. " +
  67 + "txid:{}, status:{}, retried count:{}, transaction content:{}",
  68 + transaction.getXid(),
  69 + transaction.getStatus().getId(),
  70 + transaction.getRetriedCount(),
  71 + JSON.toJSONString(transaction));
  72 + continue;
  73 + }
  74 +
  75 + if (transaction.getTransactionType().equals(TransactionType.BRANCH)) {
  76 + // 事务可被恢复的最大时间段
  77 + long maxConfigTransactionDurationTime = transaction.getCreateTime().getTime() +
  78 + transactionConfigurator.getRecoverConfig().getMaxRetryCount() *
  79 + transactionConfigurator.getRecoverConfig().getRecoverDuration() * 1000;
  80 + if (System.currentTimeMillis() > maxConfigTransactionDurationTime) {
  81 + // 超过最大恢复次数*(recoverDuration指定时间),跳过(需要人工介入处理)
  82 + // 并且是BRANCH事务(还是可以通过主事务触发commit/cancel的)
  83 + LOG.error("recover failed with max recoverDuration (maxRetryCount * recoveryDuration), will not try again. " +
  84 + "txid:{}, status:{}, max recoverDuration:{}(s), transaction recoverDuration:{}(s), transaction content:{}",
  85 + transaction.getXid(),
  86 + transaction.getStatus().getId(),
  87 + maxConfigTransactionDurationTime / 1000,
  88 + (System.currentTimeMillis() - transaction.getCreateTime().getTime()) / 1000,
  89 + JSON.toJSONString(transaction));
  90 + continue;
  91 + }
  92 + }
  93 +
  94 + try {
  95 + transaction.addRetriedCount(); // 重试次数+1
  96 +
  97 + // 传播需要恢复的事务上下文
  98 + TransactionContext transactionContext = new TransactionContext(
  99 + new TransactionXid(
  100 + transaction.getXid().getGlobalTransactionId(),
  101 + transaction.getXid().getBranchQualifier()),
  102 + transaction.getStatus().getId()
  103 + );
  104 + transactionConfigurator.getTransactionManager().propagationExistBegin(transactionContext);
  105 +
  106 + if (transaction.getStatus().equals(TransactionStatus.TRYING)) {
  107 + if (transaction.getTransactionType().equals(TransactionType.ROOT)) {
  108 + // 当事务执行超过recoverDuration指定时间,如果事务状态是TRYING(事务类型是ROOT),此时系统状态可能是,
  109 + // 可能事务try在执行中,或事务框架update中,或事务框架update出错,等等
  110 + // 注意:这些情况下,建议设计try业务方法尽可能在recoverDuration指定时间内完成
  111 + transaction.changeStatus(TransactionStatus.CANCELLING);
  112 + transactionConfigurator.getTransactionRepository().update(transaction);
  113 + transaction.rollback();
  114 + transactionConfigurator.getTransactionRepository().delete(transaction);
  115 + }
  116 +
  117 + } else if (transaction.getStatus().equals(TransactionStatus.CONFIRMING)) {
  118 + // 当事务执行超过recoverDuration指定时间,如果事务状态是CONFIRMING(不区分ROOT/BRANCH事务),此时系统状态可能是,
  119 + // 事务的confirm方法正在执行,等等
  120 + // 注意:这种情况下,必须保证confirm方法的幂等性
  121 + transaction.changeStatus(TransactionStatus.CONFIRMING);
  122 + transactionConfigurator.getTransactionRepository().update(transaction);
  123 + transaction.commit();
  124 + transactionConfigurator.getTransactionRepository().delete(transaction);
  125 +
  126 + } else if (transaction.getStatus().equals(TransactionStatus.CANCELLING)) {
  127 + // 当事务执行超过recoverDuration指定时间,如果事务状态是CANCELING(不区分ROOT/BRANCH事务),此时系统状态可能是,
  128 + // 事务还是TRYING状态,事务的cancel方法正在执行,等等
  129 + // 注意:这种情况下,必须保证cancel方法的幂等性
  130 + transaction.changeStatus(TransactionStatus.CANCELLING);
  131 + transactionConfigurator.getTransactionRepository().update(transaction);
  132 + transaction.rollback();
  133 + transactionConfigurator.getTransactionRepository().delete(transaction);
  134 +
  135 + } else {
  136 + LOG.error("transaction status error, txid:{}, status:{}",
  137 + transaction.getXid(),
  138 + transaction.getStatus().getId());
  139 +
  140 + }
  141 +
  142 + } catch (Throwable throwable) {
  143 + if (throwable instanceof OptimisticLockException
  144 + || ExceptionUtils.getRootCause(throwable) instanceof OptimisticLockException
  145 + ) {
  146 + LOG.warn(String.format("optimisticLockException happened while recover. " +
  147 + "txid:%s, status:%s,retried count:%d,transaction content:%s",
  148 + transaction.getXid(),
  149 + transaction.getStatus().getId(),
  150 + transaction.getRetriedCount(),
  151 + JSON.toJSONString(transaction)),
  152 + throwable);
  153 + } else {
  154 + LOG.error(String.format("recover failed, " +
  155 + "txid:%s, status:%s,retried count:%d,transaction content:%s",
  156 + transaction.getXid(),
  157 + transaction.getStatus().getId(),
  158 + transaction.getRetriedCount(),
  159 + JSON.toJSONString(transaction)),
  160 + throwable);
  161 + }
  162 + }
  163 + }
  164 + }
  165 +
  166 +
  167 +
  168 +}
... ...
tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/repository/TransactionRepository.java
... ... @@ -39,7 +39,16 @@ public interface TransactionRepository {
39 39  
40 40 /**
41 41 * 找出所有未处理事务日志(从某一时间点开始).
  42 + * @param date 时间
  43 + * @param transactionType 事务类型
42 44 * @return
43 45 */
44 46 List<Transaction> findAllUnmodifiedSince(Date date, TransactionType transactionType);
  47 +
  48 + /**
  49 + * 找出所有未处理事务日志(从某一时间点开始).
  50 + * @param date 时间
  51 + * @return
  52 + */
  53 + List<Transaction> findAllUnmodifiedSince(Date date);
45 54 }
... ...
tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/repository/impl/CachableTransactionRepository.java
... ... @@ -114,10 +114,20 @@ public abstract class CachableTransactionRepository implements TransactionReposi
114 114 return transactions;
115 115 }
116 116  
  117 + @Override
  118 + public List<Transaction> findAllUnmodifiedSince(Date date) {
  119 + List<Transaction> transactions = doFindAllUnmodifiedSince(date);
  120 + for (Transaction transaction : transactions) {
  121 + putToCache(transaction);
  122 + }
  123 + return transactions;
  124 + }
  125 +
117 126 //-------------------- 需要覆写的方法 --------------------//
118 127 protected abstract int doCreate(Transaction transaction);
119 128 protected abstract int doUpdate(Transaction transaction);
120 129 protected abstract int doDelete(Transaction transaction);
121 130 protected abstract Transaction doFindOne(Xid xid);
122 131 protected abstract List<Transaction> doFindAllUnmodifiedSince(Date date, TransactionType transactionType);
  132 + protected abstract List<Transaction> doFindAllUnmodifiedSince(Date date);
123 133 }
... ...
tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/repository/impl/JdbcTransactionRepository.java
... ... @@ -268,10 +268,10 @@ public class JdbcTransactionRepository extends CachableTransactionRepository {
268 268 private final static String SELECT_MULTI_SQL_Template =
269 269 "SELECT GLOBAL_TX_ID, BRANCH_QUALIFIER, CONTENT,STATUS,TRANSACTION_TYPE,CREATE_TIME,LAST_UPDATE_TIME,RETRIED_COUNT,VERSION,DOMAIN " +
270 270 "FROM %s " +
271   - "WHERE LAST_UPDATE_TIME != ? AND TRANSACTION_TYPE = ? %s";
  271 + "WHERE LAST_UPDATE_TIME < ? AND TRANSACTION_TYPE = ? %s";
272 272 @Override
273 273 protected List<Transaction> doFindAllUnmodifiedSince(Date date, TransactionType transactionType) {
274   - LOG.debug("==>doFindOne date:" + date);
  274 + LOG.debug("==>doFindOne date:{},transactionType:{}", date, transactionType);
275 275  
276 276 List<Transaction> transactions = new ArrayList<>();
277 277  
... ... @@ -315,4 +315,61 @@ public class JdbcTransactionRepository extends CachableTransactionRepository {
315 315  
316 316 return transactions;
317 317 }
  318 +
  319 + private final static String SELECT_MULTI_SQL_Template_2 =
  320 + "SELECT GLOBAL_TX_ID, BRANCH_QUALIFIER, CONTENT,STATUS,TRANSACTION_TYPE,CREATE_TIME,LAST_UPDATE_TIME,RETRIED_COUNT,VERSION,DOMAIN " +
  321 + "FROM %s " +
  322 + "WHERE LAST_UPDATE_TIME < ? %s";
  323 +
  324 + @Override
  325 + public List<Transaction> doFindAllUnmodifiedSince(Date date) {
  326 + LOG.debug("==>doFindOne date:" + date);
  327 +
  328 + List<Transaction> transactions = new ArrayList<>();
  329 +
  330 + Connection connection = null;
  331 + PreparedStatement stmt = null;
  332 +
  333 + try {
  334 + connection = this.getConnection();
  335 +
  336 + if (Strings.isNullOrEmpty(domain)) {
  337 + stmt = connection.prepareStatement(String.format(SELECT_MULTI_SQL_Template_2,
  338 + getTableName(), ""));
  339 +
  340 + LOG.debug("sql={}", String.format(SELECT_MULTI_SQL_Template_2,
  341 + getTableName(), ""));
  342 +
  343 + } else {
  344 + stmt = connection.prepareStatement(String.format(SELECT_MULTI_SQL_Template_2,
  345 + getTableName(), "AND DOMAIN = ?"));
  346 +
  347 + LOG.debug("sql={}", String.format(SELECT_MULTI_SQL_Template_2,
  348 + getTableName(), ""));
  349 + }
  350 + stmt.setTimestamp(1, new Timestamp(date.getTime()));
  351 + if (!Strings.isNullOrEmpty(domain)) {
  352 + stmt.setString(2, domain);
  353 + }
  354 +
  355 + ResultSet resultSet = stmt.executeQuery();
  356 +
  357 + while (resultSet.next()) {
  358 + byte[] transactionBytes = resultSet.getBytes(3);
  359 + Transaction transaction = (Transaction) serializer.deserialize(transactionBytes);
  360 + transaction.setLastUpdateTime(resultSet.getDate(7));
  361 + transaction.setVersion(resultSet.getLong(9));
  362 + transaction.resetRetriedCount(resultSet.getInt(8));
  363 + transactions.add(transaction);
  364 + }
  365 +
  366 + } catch (Throwable e) {
  367 + throw new TransactionIOException(e);
  368 + } finally {
  369 + closeStatement(stmt);
  370 + this.releaseConnection(connection);
  371 + }
  372 +
  373 + return transactions;
  374 + }
318 375 }
... ...
tcc-transaction-spring/src/main/java/org/mengyun/tcctransaction/spring/recover/RecoverScheduledJob.java
... ... @@ -40,7 +40,6 @@ public class RecoverScheduledJob {
40 40 * 初始化方法,Spring启动时执行.
41 41 */
42 42 public void init() {
43   -
44 43 try {
45 44 // MethodInvokingJobDetailFactoryBean 负责生成具体的任务,只需要指定某个对象的某个方法,在触发器触发时,即调用指定对象的指定方法。
46 45 MethodInvokingJobDetailFactoryBean jobDetail = new MethodInvokingJobDetailFactoryBean();
... ...
tcc-transaction-spring/src/main/resources/tcc-transaction.xml
... ... @@ -45,7 +45,7 @@
45 45 <!-- 事务恢复任务调度器 -->
46 46 <bean id="recoverScheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean"/>
47 47 <!-- 事务恢复 -->
48   - <bean id="transactionRecovery" class="org.mengyun.tcctransaction.recover.TransactionRecovery">
  48 + <bean id="transactionRecovery" class="org.mengyun.tcctransaction.recover.TransactionRecovery_1_2">
49 49 <property name="transactionConfigurator" ref="tccTransactionConfigurator"/>
50 50 </bean>
51 51  
... ...