Commit 7ad9e686ca241a02e4c2e675d4d4c7ecd0fbb4c2

Authored by 王通
1 parent 968cf814

1.指令入库日志

src/main/java/com/bsth/data/directive/DirectivesPstThread.java
1   -package com.bsth.data.directive;
2   -
3   -import com.alibaba.fastjson.JSON;
4   -import com.bsth.data.schedule.DayOfSchedule;
5   -import com.bsth.entity.directive.D60;
6   -import com.bsth.entity.directive.D64;
7   -import com.bsth.entity.directive.Directive;
8   -import com.bsth.repository.directive.D60Repository;
9   -import com.bsth.repository.directive.D64Repository;
10   -import org.joda.time.format.DateTimeFormat;
11   -import org.joda.time.format.DateTimeFormatter;
12   -import org.slf4j.Logger;
13   -import org.slf4j.LoggerFactory;
14   -import org.springframework.beans.factory.annotation.Autowired;
15   -import org.springframework.jdbc.core.BatchPreparedStatementSetter;
16   -import org.springframework.jdbc.core.JdbcTemplate;
17   -import org.springframework.jdbc.datasource.DataSourceTransactionManager;
18   -import org.springframework.stereotype.Component;
19   -import org.springframework.transaction.TransactionDefinition;
20   -import org.springframework.transaction.TransactionStatus;
21   -import org.springframework.transaction.support.DefaultTransactionDefinition;
22   -
23   -import java.sql.PreparedStatement;
24   -import java.sql.SQLException;
25   -import java.sql.Types;
26   -import java.util.ArrayList;
27   -import java.util.List;
28   -import java.util.concurrent.ConcurrentLinkedQueue;
29   -
30   -/**
31   - * 指令持久化线程
32   - * Created by panzhao on 2017/3/6.
33   - */
34   -@Component
35   -public class DirectivesPstThread extends Thread {
36   -
37   - Logger logger = LoggerFactory.getLogger(this.getClass());
38   -
39   - @Autowired
40   - D60Repository d60Repository;
41   -
42   - @Autowired
43   - D64Repository d64Repository;
44   -
45   - @Autowired
46   - DayOfSchedule dayOfSchedule;
47   -
48   - @Autowired
49   - JdbcTemplate jdbcTemplate;
50   -
51   - private static DateTimeFormatter fmtyyyyMMdd = DateTimeFormat.forPattern("yyyy-MM-dd");
52   -
53   - @Override
54   - public void run() {
55   - try{
56   - ConcurrentLinkedQueue<Directive> list = DayOfDirectives.pstDirectives;
57   -
58   - List<D60> d60s = new ArrayList<>();
59   - List<D64> d64s = new ArrayList<>();
60   - //按 60 和 64 分组
61   - Directive directive;
62   - D60 d60;
63   - for (int i = 0; i < 2000; i++) {
64   - directive = list.poll();
65   - if(null == directive)
66   - break;
67   -
68   - //日期
69   - directive.setRq(fmtyyyyMMdd.print(directive.getTimestamp()));
70   -
71   - if (directive instanceof D60) {
72   - d60 = (D60) directive;
73   - if(isDelete(d60))
74   - continue;
75   - d60s.add(d60);
76   - }
77   - else if(directive instanceof D64)
78   - d64s.add((D64) directive);
79   - }
80   -
81   - //入库60
82   - save60(d60s);
83   - //入库64
84   - save64(d64s);
85   -
86   -
87   - // 60 指令更新(车载响应)
88   - ConcurrentLinkedQueue<D60> updateD60s = DayOfDirectives.pstD60s;
89   - d60s = new ArrayList<>();
90   - for (int i = 0; i < 2000; i++) {
91   - d60 = updateD60s.poll();
92   - if(null == d60)
93   - break;
94   - d60s.add(d60);
95   - }
96   -
97   - if(d60s.size() > 0)
98   - update60(d60s);
99   - }catch (Exception e){
100   - logger.error("指令入库出现异常", e);
101   - }
102   - }
103   -
104   - private void save64(final List<D64> d64s) {
105   - if(null == d64s || d64s.size() == 0)
106   - return;
107   -
108   - String sql = "insert into bsth_v_directive_64(device_id,error_text,http_code,oper_code,rq,sender,timestamp,city_code,line_id,txt_content,resp_ack) " +
109   - " values(?,?,?,?,?,?,?,?,?,?,?)";
110   -
111   - //编程式事务
112   - DataSourceTransactionManager tran = new DataSourceTransactionManager(jdbcTemplate.getDataSource());
113   - DefaultTransactionDefinition def = new DefaultTransactionDefinition();
114   - def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
115   - TransactionStatus status = tran.getTransaction(def);
116   -
117   - try{
118   - jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
119   - @Override
120   - public void setValues(PreparedStatement ps, int i) throws SQLException {
121   - D64 d64 = d64s.get(i);
122   - ps.setString(1 , d64.getDeviceId());
123   - ps.setString(2, isNvl(d64.getErrorText()));
124   - ps.setInt(3, d64.getHttpCode());
125   - ps.setShort(4, isNvl(d64.getOperCode()));
126   - ps.setString(5, d64.getRq());
127   -
128   - ps.setString(6, isNvl(d64.getSender()));
129   - ps.setLong(7, d64.getTimestamp());
130   -
131   - ps.setShort(8, isNvl(d64.getData().getCityCode()));
132   - ps.setString(9, isNvl(d64.getData().getLineId()));
133   - ps.setString(10, isNvl(d64.getData().getTxtContent()));
134   - ps.setShort(11, isNvl(d64.getRespAck()));
135   - }
136   -
137   - @Override
138   - public int getBatchSize() {
139   - return d64s.size();
140   - }
141   - });
142   -
143   - tran.commit(status);
144   -
145   - logger.info("64 入库成功: " + d64s.size());
146   - }catch (Exception e){
147   - tran.rollback(status);
148   - logger.error("", e);
149   - logger.warn("失败的数据:" + JSON.toJSONString(d64s));
150   - }
151   - }
152   -
153   - private void update60(final List<D60> d60s) {
154   - if(null == d60s || d60s.size() == 0)
155   - return;
156   -
157   - String sql = "update bsth_v_directive_60 set reply46=?,reply46time=?,reply47=?,reply47time=? where device_id=? and timestamp=? and msg_id=?";
158   -
159   - //编程式事务
160   - DataSourceTransactionManager tran = new DataSourceTransactionManager(jdbcTemplate.getDataSource());
161   - DefaultTransactionDefinition def = new DefaultTransactionDefinition();
162   - def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
163   - TransactionStatus status = tran.getTransaction(def);
164   -
165   - try{
166   - jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
167   - @Override
168   - public void setValues(PreparedStatement ps, int i) throws SQLException {
169   - D60 d60 = d60s.get(i);
170   - ps.setShort(1, isNvl(d60.getReply46()));
171   - if(null == d60.getReply46Time())
172   - ps.setNull(2, Types.BIGINT);
173   - else
174   - ps.setLong(2, d60.getReply46Time());
175   -
176   - ps.setShort(3, isNvl(d60.getReply47()));
177   -
178   - if(null == d60.getReply47Time())
179   - ps.setNull(4, Types.BIGINT);
180   - else
181   - ps.setLong(4, d60.getReply47Time());
182   - ps.setString(5, d60.getDeviceId());
183   - ps.setLong(6, d60.getTimestamp());
184   - ps.setInt(7, d60.getMsgId());
185   - }
186   -
187   - @Override
188   - public int getBatchSize() {
189   - return d60s.size();
190   - }
191   - });
192   -
193   - tran.commit(status);
194   -
195   - logger.info("60 更新成功: " + d60s.size());
196   - }catch (Exception e){
197   - tran.rollback(status);
198   - logger.error("", e);
199   - logger.warn("失败的数据:" + JSON.toJSONString(d60s));
200   - }
201   - }
202   -
203   - private void save60(final List<D60> d60s) {
204   - if(null == d60s || d60s.size() == 0)
205   - return;
206   -
207   - String sql = "insert into bsth_v_directive_60(device_id,error_text,http_code,oper_code,rq,sender,timestamp" +
208   - ",alarm_time,company_code,dispatch_instruct,instruct_type,msg_id,service_state,txt_content,is_dispatch" +
209   - ",line_code,reply46,reply46time,reply47,reply47time,sch) " +
210   - " values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
211   -
212   - //编程式事务
213   - DataSourceTransactionManager tran = new DataSourceTransactionManager(jdbcTemplate.getDataSource());
214   - DefaultTransactionDefinition def = new DefaultTransactionDefinition();
215   - def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
216   - TransactionStatus status = tran.getTransaction(def);
217   -
218   - try{
219   - jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
220   - @Override
221   - public void setValues(PreparedStatement ps, int i) throws SQLException {
222   - D60 d60 = d60s.get(i);
223   - ps.setString(1, d60.getDeviceId());
224   - ps.setString(2, isNvl(d60.getErrorText()));
225   - ps.setInt(3, d60.getHttpCode());
226   - ps.setShort(4, d60.getOperCode());
227   - ps.setString(5, d60.getRq());
228   - ps.setString(6, d60.getSender());
229   - ps.setLong(7, d60.getTimestamp());
230   -
231   - ps.setLong(8, isNvl(d60.getData().getAlarmTime()));
232   - ps.setShort(9, isNvl(d60.getData().getCompanyCode()));
233   - ps.setShort(10, isNvl(d60.getData().getDispatchInstruct()));
234   - ps.setInt(11, d60.getData().getInstructType());
235   - ps.setInt(12, d60.getData().getMsgId());
236   - ps.setLong(13, d60.getData().getServiceState());
237   - ps.setString(14, d60.getData().getTxtContent());
238   - ps.setBoolean(15, d60.isDispatch());
239   -
240   - ps.setString(16, isNvl(d60.getLineCode()));
241   - ps.setShort(17, isNvl(d60.getReply46()));
242   -
243   - if(null == d60.getReply46Time())
244   - ps.setNull(18, Types.BIGINT);
245   - else
246   - ps.setLong(18, d60.getReply46Time());
247   -
248   - ps.setShort(19, isNvl(d60.getReply47()));
249   -
250   - if(null == d60.getReply47Time())
251   - ps.setNull(20, Types.BIGINT);
252   - else
253   - ps.setLong(20, d60.getReply47Time());
254   -
255   - if(d60.getSch()==null)
256   - ps.setNull(21, Types.BIGINT);
257   - else
258   - ps.setLong(21, d60.getSch().getId());
259   - }
260   -
261   - @Override
262   - public int getBatchSize() {
263   - return d60s.size();
264   - }
265   - });
266   -
267   - tran.commit(status);
268   -
269   - logger.info("60 入库成功: " + d60s.size());
270   - }catch (Exception e){
271   - tran.rollback(status);
272   - logger.error("", e);
273   - logger.warn("失败的数据:" + JSON.toJSONString(d60s));
274   - }
275   - }
276   -
277   - private String isNvl(String v) {
278   - return v==null?"":v;
279   - }
280   -
281   - private short isNvl(Short v) {
282   - return v==null?0:v;
283   - }
284   -
285   - private long isNvl(Long v) {
286   - return v==null?0:v;
287   - }
288   -
289   - private boolean isDelete(D60 d60){
290   - try{
291   - //如果关联的班次已经不存在了,放弃入库,很低概率出现
292   - if(d60.isDispatch() && d60.getSch().isDeleted()){
293   - logger.warn("save 指令,发现 deleted=true 的班次,id=" + d60.getSch().getId());
294   - return true;
295   - }
296   - }catch (Exception e){
297   - logger.error("", e);
298   - }
299   -
300   - return false;
301   - }
302   -}
  1 +package com.bsth.data.directive;
  2 +
  3 +import com.alibaba.fastjson.JSON;
  4 +import com.bsth.data.schedule.DayOfSchedule;
  5 +import com.bsth.entity.directive.D60;
  6 +import com.bsth.entity.directive.D64;
  7 +import com.bsth.entity.directive.Directive;
  8 +import com.bsth.repository.directive.D60Repository;
  9 +import com.bsth.repository.directive.D64Repository;
  10 +import org.joda.time.format.DateTimeFormat;
  11 +import org.joda.time.format.DateTimeFormatter;
  12 +import org.slf4j.Logger;
  13 +import org.slf4j.LoggerFactory;
  14 +import org.springframework.beans.factory.annotation.Autowired;
  15 +import org.springframework.jdbc.core.BatchPreparedStatementSetter;
  16 +import org.springframework.jdbc.core.JdbcTemplate;
  17 +import org.springframework.jdbc.datasource.DataSourceTransactionManager;
  18 +import org.springframework.stereotype.Component;
  19 +import org.springframework.transaction.TransactionDefinition;
  20 +import org.springframework.transaction.TransactionStatus;
  21 +import org.springframework.transaction.support.DefaultTransactionDefinition;
  22 +
  23 +import java.sql.PreparedStatement;
  24 +import java.sql.SQLException;
  25 +import java.sql.Types;
  26 +import java.util.ArrayList;
  27 +import java.util.List;
  28 +import java.util.concurrent.ConcurrentLinkedQueue;
  29 +
  30 +/**
  31 + * 指令持久化线程
  32 + * Created by panzhao on 2017/3/6.
  33 + */
  34 +@Component
  35 +public class DirectivesPstThread extends Thread {
  36 +
  37 + Logger logger = LoggerFactory.getLogger(this.getClass());
  38 +
  39 + @Autowired
  40 + D60Repository d60Repository;
  41 +
  42 + @Autowired
  43 + D64Repository d64Repository;
  44 +
  45 + @Autowired
  46 + DayOfSchedule dayOfSchedule;
  47 +
  48 + @Autowired
  49 + JdbcTemplate jdbcTemplate;
  50 +
  51 + private static DateTimeFormatter fmtyyyyMMdd = DateTimeFormat.forPattern("yyyy-MM-dd");
  52 +
  53 + @Override
  54 + public void run() {
  55 + try{
  56 + ConcurrentLinkedQueue<Directive> list = DayOfDirectives.pstDirectives;
  57 +
  58 + List<D60> d60s = new ArrayList<>();
  59 + List<D64> d64s = new ArrayList<>();
  60 + //按 60 和 64 分组
  61 + Directive directive;
  62 + D60 d60;
  63 + for (int i = 0; i < 2000; i++) {
  64 + directive = list.poll();
  65 + if(null == directive)
  66 + break;
  67 +
  68 + //日期
  69 + directive.setRq(fmtyyyyMMdd.print(directive.getTimestamp()));
  70 +
  71 + if (directive instanceof D60) {
  72 + d60 = (D60) directive;
  73 + if(isDelete(d60))
  74 + continue;
  75 + d60s.add(d60);
  76 + }
  77 + else if(directive instanceof D64)
  78 + d64s.add((D64) directive);
  79 + }
  80 +
  81 + //入库60
  82 + save60(d60s);
  83 + //入库64
  84 + save64(d64s);
  85 +
  86 +
  87 + // 60 指令更新(车载响应)
  88 + ConcurrentLinkedQueue<D60> updateD60s = DayOfDirectives.pstD60s;
  89 + d60s = new ArrayList<>();
  90 + for (int i = 0; i < 2000; i++) {
  91 + d60 = updateD60s.poll();
  92 + if(null == d60)
  93 + break;
  94 + d60s.add(d60);
  95 + }
  96 +
  97 + if(d60s.size() > 0)
  98 + update60(d60s);
  99 + }catch (Exception e){
  100 + logger.error("指令入库出现异常", e);
  101 + }
  102 + }
  103 +
  104 + private void save64(final List<D64> d64s) {
  105 + if(null == d64s || d64s.size() == 0)
  106 + return;
  107 +
  108 + String sql = "insert into bsth_v_directive_64(device_id,error_text,http_code,oper_code,rq,sender,timestamp,city_code,line_id,txt_content,resp_ack) " +
  109 + " values(?,?,?,?,?,?,?,?,?,?,?)";
  110 +
  111 + //编程式事务
  112 + DataSourceTransactionManager tran = new DataSourceTransactionManager(jdbcTemplate.getDataSource());
  113 + DefaultTransactionDefinition def = new DefaultTransactionDefinition();
  114 + def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
  115 + TransactionStatus status = tran.getTransaction(def);
  116 +
  117 + try{
  118 + jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
  119 + @Override
  120 + public void setValues(PreparedStatement ps, int i) throws SQLException {
  121 + D64 d64 = d64s.get(i);
  122 + ps.setString(1 , d64.getDeviceId());
  123 + ps.setString(2, isNvl(d64.getErrorText()));
  124 + ps.setInt(3, d64.getHttpCode());
  125 + ps.setShort(4, isNvl(d64.getOperCode()));
  126 + ps.setString(5, d64.getRq());
  127 +
  128 + ps.setString(6, isNvl(d64.getSender()));
  129 + ps.setLong(7, d64.getTimestamp());
  130 +
  131 + ps.setShort(8, isNvl(d64.getData().getCityCode()));
  132 + ps.setString(9, isNvl(d64.getData().getLineId()));
  133 + ps.setString(10, isNvl(d64.getData().getTxtContent()));
  134 + ps.setShort(11, isNvl(d64.getRespAck()));
  135 + }
  136 +
  137 + @Override
  138 + public int getBatchSize() {
  139 + return d64s.size();
  140 + }
  141 + });
  142 +
  143 + tran.commit(status);
  144 +
  145 + logger.info("64 入库成功: " + d64s.size());
  146 + }catch (Exception e){
  147 + logger.error(String.format("错误数据:%s", JSON.toJSONString(d64s)), e);
  148 + tran.rollback(status);
  149 + }
  150 + }
  151 +
  152 + private void update60(final List<D60> d60s) {
  153 + if(null == d60s || d60s.size() == 0)
  154 + return;
  155 +
  156 + String sql = "update bsth_v_directive_60 set reply46=?,reply46time=?,reply47=?,reply47time=? where device_id=? and timestamp=? and msg_id=?";
  157 +
  158 + //编程式事务
  159 + DataSourceTransactionManager tran = new DataSourceTransactionManager(jdbcTemplate.getDataSource());
  160 + DefaultTransactionDefinition def = new DefaultTransactionDefinition();
  161 + def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
  162 + TransactionStatus status = tran.getTransaction(def);
  163 +
  164 + try{
  165 + jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
  166 + @Override
  167 + public void setValues(PreparedStatement ps, int i) throws SQLException {
  168 + D60 d60 = d60s.get(i);
  169 + ps.setShort(1, isNvl(d60.getReply46()));
  170 + if(null == d60.getReply46Time())
  171 + ps.setNull(2, Types.BIGINT);
  172 + else
  173 + ps.setLong(2, d60.getReply46Time());
  174 +
  175 + ps.setShort(3, isNvl(d60.getReply47()));
  176 +
  177 + if(null == d60.getReply47Time())
  178 + ps.setNull(4, Types.BIGINT);
  179 + else
  180 + ps.setLong(4, d60.getReply47Time());
  181 + ps.setString(5, d60.getDeviceId());
  182 + ps.setLong(6, d60.getTimestamp());
  183 + ps.setInt(7, d60.getMsgId());
  184 + }
  185 +
  186 + @Override
  187 + public int getBatchSize() {
  188 + return d60s.size();
  189 + }
  190 + });
  191 +
  192 + tran.commit(status);
  193 +
  194 + logger.info("60 更新成功: " + d60s.size());
  195 + }catch (Exception e){
  196 + logger.error(String.format("错误数据:%s", JSON.toJSONString(d60s)), e);
  197 + tran.rollback(status);
  198 + }
  199 + }
  200 +
  201 + private void save60(final List<D60> d60s) {
  202 + if(null == d60s || d60s.size() == 0)
  203 + return;
  204 +
  205 + String sql = "insert into bsth_v_directive_60(device_id,error_text,http_code,oper_code,rq,sender,timestamp" +
  206 + ",alarm_time,company_code,dispatch_instruct,instruct_type,msg_id,service_state,txt_content,is_dispatch" +
  207 + ",line_code,reply46,reply46time,reply47,reply47time,sch) " +
  208 + " values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
  209 +
  210 + //编程式事务
  211 + DataSourceTransactionManager tran = new DataSourceTransactionManager(jdbcTemplate.getDataSource());
  212 + DefaultTransactionDefinition def = new DefaultTransactionDefinition();
  213 + def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
  214 + TransactionStatus status = tran.getTransaction(def);
  215 +
  216 + try{
  217 + jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
  218 + @Override
  219 + public void setValues(PreparedStatement ps, int i) throws SQLException {
  220 + D60 d60 = d60s.get(i);
  221 + ps.setString(1, d60.getDeviceId());
  222 + ps.setString(2, isNvl(d60.getErrorText()));
  223 + ps.setInt(3, d60.getHttpCode());
  224 + ps.setShort(4, d60.getOperCode());
  225 + ps.setString(5, d60.getRq());
  226 + ps.setString(6, d60.getSender());
  227 + ps.setLong(7, d60.getTimestamp());
  228 +
  229 + ps.setLong(8, isNvl(d60.getData().getAlarmTime()));
  230 + ps.setShort(9, isNvl(d60.getData().getCompanyCode()));
  231 + ps.setShort(10, isNvl(d60.getData().getDispatchInstruct()));
  232 + ps.setInt(11, d60.getData().getInstructType());
  233 + ps.setInt(12, d60.getData().getMsgId());
  234 + ps.setLong(13, d60.getData().getServiceState());
  235 + ps.setString(14, d60.getData().getTxtContent());
  236 + ps.setBoolean(15, d60.isDispatch());
  237 +
  238 + ps.setString(16, isNvl(d60.getLineCode()));
  239 + ps.setShort(17, isNvl(d60.getReply46()));
  240 +
  241 + if(null == d60.getReply46Time())
  242 + ps.setNull(18, Types.BIGINT);
  243 + else
  244 + ps.setLong(18, d60.getReply46Time());
  245 +
  246 + ps.setShort(19, isNvl(d60.getReply47()));
  247 +
  248 + if(null == d60.getReply47Time())
  249 + ps.setNull(20, Types.BIGINT);
  250 + else
  251 + ps.setLong(20, d60.getReply47Time());
  252 +
  253 + if(d60.getSch()==null)
  254 + ps.setNull(21, Types.BIGINT);
  255 + else
  256 + ps.setLong(21, d60.getSch().getId());
  257 + }
  258 +
  259 + @Override
  260 + public int getBatchSize() {
  261 + return d60s.size();
  262 + }
  263 + });
  264 +
  265 + tran.commit(status);
  266 +
  267 + logger.info("60 入库成功: " + d60s.size());
  268 + }catch (Exception e){
  269 + logger.error(String.format("错误数据:%s", JSON.toJSONString(d60s)), e);
  270 + tran.rollback(status);
  271 + }
  272 + }
  273 +
  274 + private String isNvl(String v) {
  275 + return v==null?"":v;
  276 + }
  277 +
  278 + private short isNvl(Short v) {
  279 + return v==null?0:v;
  280 + }
  281 +
  282 + private long isNvl(Long v) {
  283 + return v==null?0:v;
  284 + }
  285 +
  286 + private boolean isDelete(D60 d60){
  287 + try{
  288 + //如果关联的班次已经不存在了,放弃入库,很低概率出现
  289 + if(d60.isDispatch() && d60.getSch().isDeleted()){
  290 + logger.warn("save 指令,发现 deleted=true 的班次,id=" + d60.getSch().getId());
  291 + return true;
  292 + }
  293 + }catch (Exception e){
  294 + logger.error("", e);
  295 + }
  296 +
  297 + return false;
  298 + }
  299 +}
... ...