Commit 58857a3c101f298ff41aac2a79e8c50c493c75cf

Authored by 娄高锋
2 parents f6ff2976 da339ef1

Merge branch 'pudong' of 192.168.168.201:panzhaov5/bsth_control into pudong

src/main/java/com/bsth/controller/realcontrol/AdminUtilsController.java
1 -package com.bsth.controller.realcontrol;  
2 -  
3 -import com.bsth.data.directive.DayOfDirectives;  
4 -import com.bsth.data.gpsdata_v2.cache.GeoCacheData;  
5 -import com.bsth.data.gpsdata_v2.handlers.overspeed.OverspeedProcess;  
6 -import com.bsth.data.gpsdata_v2.thread.GpsDataLoaderThread;  
7 -import com.bsth.data.msg_queue.DirectivePushQueue;  
8 -import com.bsth.data.msg_queue.WebSocketPushQueue;  
9 -import com.bsth.data.pilot80.PilotReport;  
10 -import com.bsth.data.schedule.DayOfSchedule;  
11 -import com.bsth.entity.realcontrol.ScheduleRealInfo;  
12 -import com.bsth.websocket.handler.SendUtils;  
13 -import org.slf4j.Logger;  
14 -import org.slf4j.LoggerFactory;  
15 -import org.springframework.beans.factory.annotation.Autowired;  
16 -import org.springframework.web.bind.annotation.RequestMapping;  
17 -import org.springframework.web.bind.annotation.RequestParam;  
18 -import org.springframework.web.bind.annotation.RestController;  
19 -  
20 -import java.util.HashMap;  
21 -import java.util.List;  
22 -import java.util.Map;  
23 -  
24 -/**  
25 - * Created by panzhao on 2017/4/14.  
26 - */  
27 -@RestController  
28 -@RequestMapping("adminUtils")  
29 -public class AdminUtilsController {  
30 -  
31 -  
32 - Logger logger = LoggerFactory.getLogger(this.getClass());  
33 -  
34 - @Autowired  
35 - DayOfSchedule dayOfSchedule;  
36 -  
37 - @Autowired  
38 - GeoCacheData geoCacheData;  
39 -  
40 - @Autowired  
41 - DayOfDirectives dayOfDirectives;  
42 -  
43 - @Autowired  
44 - SendUtils sendUtils;  
45 -  
46 - @Autowired  
47 - PilotReport pilotReport;  
48 -  
49 - /**  
50 - * 出现重复班次的车辆  
51 - * @param  
52 -  
53 - @RequestMapping(value = "/schRepeat", method = RequestMethod.POST)  
54 - public void schRepeat(@RequestParam String nbbm){  
55 - logger.info("前端通知,车辆 " + nbbm + "出现重复班次,开始检测...");  
56 - List<ScheduleRealInfo> list = dayOfSchedule.findByNbbm(nbbm);  
57 - logger.info("检测前,车辆班次数量:" + list.size());  
58 -  
59 - Map<Long, ScheduleRealInfo> map = new HashMap<>();  
60 - for(ScheduleRealInfo sch : list){  
61 - if(map.containsKey(sch.getId())){  
62 - logger.info("检测到重复ID: " + sch.getId());  
63 - }  
64 - map.put(sch.getId(), sch);  
65 - }  
66 -  
67 - logger.info("检测后,车辆班次数量:" + list.size());  
68 - if(map.values().size() > 0){  
69 - dayOfSchedule.replaceByNbbm(nbbm, map.values());  
70 - }  
71 - }*/  
72 -  
73 -/* @RequestMapping(value = "/directivePushQueue")  
74 - public void directivePushQueue(){  
75 - DirectivePushQueue.start();  
76 - }*/  
77 -  
78 - @RequestMapping(value = "/directiveQueueSize")  
79 - public void directiveQueueSize(){  
80 - DirectivePushQueue.size();  
81 - }  
82 -  
83 - /*@RequestMapping(value = "/webSocketPushQueue")  
84 - public void webSocketPushQueue(){  
85 - WebSocketPushQueue.start();  
86 - }*/  
87 -  
88 - @RequestMapping(value = "/webSocketQueueSize")  
89 - public void webSocketQueueSize(){  
90 - WebSocketPushQueue.size();  
91 - }  
92 -  
93 - @RequestMapping(value = "/setHttpFlag")  
94 - public void setHttpFlag(@RequestParam int flag){  
95 - if(flag != 0 && flag != -1)  
96 - return;  
97 - GpsDataLoaderThread.setFlag(flag);  
98 - }  
99 -  
100 - @RequestMapping(value = "/updateCacheBuff")  
101 - public void updateCacheBuff(){  
102 - geoCacheData.loadData();  
103 - }  
104 -  
105 - @RequestMapping(value = "/reCalcLpSch")  
106 - public void reCalcLpSch(){  
107 - dayOfSchedule._test_reCalcLpSch();  
108 - }  
109 -  
110 - @RequestMapping(value = "/findSchByLpName")  
111 - public List<ScheduleRealInfo> findSchByLpName(@RequestParam String lpName){  
112 - return dayOfSchedule.getLpScheduleMap().get(lpName);  
113 - }  
114 -  
115 - @RequestMapping(value = "/findSchByNbbm")  
116 - public List<ScheduleRealInfo> findSchByNbbm(@RequestParam String nbbm){  
117 - return dayOfSchedule.findByNbbm(nbbm);  
118 - }  
119 -  
120 - @RequestMapping(value = "/removeExecPlan")  
121 - public int removeExecPlan(@RequestParam String nbbm){  
122 - dayOfSchedule.removeExecPlan(nbbm);  
123 - return 1;  
124 - }  
125 -  
126 - @RequestMapping(value = "/sch_re_calc_id_maps")  
127 - public int reCalcIdMaps(){  
128 - return dayOfSchedule.reCalcIdMaps();  
129 - }  
130 -  
131 - @RequestMapping(value = "/sch_size_string")  
132 - public String schSizeString(){  
133 - return dayOfSchedule.sizeString();  
134 - }  
135 -  
136 - @RequestMapping(value = "/containerSize")  
137 - public Map<String, Integer> containerSize(){  
138 - Map<String, Integer> rs = new HashMap<>();  
139 - rs.put("60_size", dayOfDirectives.all60().size());  
140 - rs.put("80_size", pilotReport.findAll().size());  
141 - rs.put("nbbm_sch_size", dayOfSchedule.findAll().size());  
142 - rs.put("lp_sch_size", dayOfSchedule.findAllByLpContainer().size());  
143 - rs.put("id_sch_size", dayOfSchedule.findAllByIdContainer().size());  
144 - rs.put("pst_sch_size", dayOfSchedule.getPstSize());  
145 - rs.put("speeds_size", OverspeedProcess.size());  
146 - return rs;  
147 - }  
148 -  
149 - @RequestMapping(value = "/websocketRadioText")  
150 - public int radioText(String t, String lineCode){  
151 - sendUtils.sendRadioText(t, lineCode);  
152 - return 0;  
153 - }  
154 -} 1 +package com.bsth.controller.realcontrol;
  2 +
  3 +import com.bsth.data.directive.DayOfDirectives;
  4 +import com.bsth.data.directive.DirectivesPstThread;
  5 +import com.bsth.data.gpsdata_v2.cache.GeoCacheData;
  6 +import com.bsth.data.gpsdata_v2.handlers.overspeed.OverspeedProcess;
  7 +import com.bsth.data.gpsdata_v2.thread.GpsDataLoaderThread;
  8 +import com.bsth.data.msg_queue.DirectivePushQueue;
  9 +import com.bsth.data.msg_queue.WebSocketPushQueue;
  10 +import com.bsth.data.pilot80.PilotReport;
  11 +import com.bsth.data.schedule.DayOfSchedule;
  12 +import com.bsth.entity.realcontrol.ScheduleRealInfo;
  13 +import com.bsth.websocket.handler.SendUtils;
  14 +import org.slf4j.Logger;
  15 +import org.slf4j.LoggerFactory;
  16 +import org.springframework.beans.factory.annotation.Autowired;
  17 +import org.springframework.web.bind.annotation.RequestMapping;
  18 +import org.springframework.web.bind.annotation.RequestParam;
  19 +import org.springframework.web.bind.annotation.RestController;
  20 +
  21 +import java.util.HashMap;
  22 +import java.util.List;
  23 +import java.util.Map;
  24 +
  25 +/**
  26 + * Created by panzhao on 2017/4/14.
  27 + */
  28 +@RestController
  29 +@RequestMapping("adminUtils")
  30 +public class AdminUtilsController {
  31 +
  32 +
  33 + Logger logger = LoggerFactory.getLogger(this.getClass());
  34 +
  35 + @Autowired
  36 + DayOfSchedule dayOfSchedule;
  37 +
  38 + @Autowired
  39 + GeoCacheData geoCacheData;
  40 +
  41 + @Autowired
  42 + DayOfDirectives dayOfDirectives;
  43 +
  44 + @Autowired
  45 + SendUtils sendUtils;
  46 +
  47 + @Autowired
  48 + PilotReport pilotReport;
  49 +
  50 + /**
  51 + * 出现重复班次的车辆
  52 + * @param
  53 +
  54 + @RequestMapping(value = "/schRepeat", method = RequestMethod.POST)
  55 + public void schRepeat(@RequestParam String nbbm){
  56 + logger.info("前端通知,车辆 " + nbbm + "出现重复班次,开始检测...");
  57 + List<ScheduleRealInfo> list = dayOfSchedule.findByNbbm(nbbm);
  58 + logger.info("检测前,车辆班次数量:" + list.size());
  59 +
  60 + Map<Long, ScheduleRealInfo> map = new HashMap<>();
  61 + for(ScheduleRealInfo sch : list){
  62 + if(map.containsKey(sch.getId())){
  63 + logger.info("检测到重复ID: " + sch.getId());
  64 + }
  65 + map.put(sch.getId(), sch);
  66 + }
  67 +
  68 + logger.info("检测后,车辆班次数量:" + list.size());
  69 + if(map.values().size() > 0){
  70 + dayOfSchedule.replaceByNbbm(nbbm, map.values());
  71 + }
  72 + }*/
  73 +
  74 +/* @RequestMapping(value = "/directivePushQueue")
  75 + public void directivePushQueue(){
  76 + DirectivePushQueue.start();
  77 + }*/
  78 +
  79 + @RequestMapping(value = "/directiveQueueSize")
  80 + public void directiveQueueSize(){
  81 + DirectivePushQueue.size();
  82 + }
  83 +
  84 + /*@RequestMapping(value = "/webSocketPushQueue")
  85 + public void webSocketPushQueue(){
  86 + WebSocketPushQueue.start();
  87 + }*/
  88 +
  89 + @RequestMapping(value = "/webSocketQueueSize")
  90 + public void webSocketQueueSize(){
  91 + WebSocketPushQueue.size();
  92 + }
  93 +
  94 + @RequestMapping(value = "/setHttpFlag")
  95 + public void setHttpFlag(@RequestParam int flag){
  96 + if(flag != 0 && flag != -1)
  97 + return;
  98 + GpsDataLoaderThread.setFlag(flag);
  99 + }
  100 +
  101 + @RequestMapping(value = "/updateCacheBuff")
  102 + public void updateCacheBuff(){
  103 + geoCacheData.loadData();
  104 + }
  105 +
  106 + @RequestMapping(value = "/reCalcLpSch")
  107 + public void reCalcLpSch(){
  108 + dayOfSchedule._test_reCalcLpSch();
  109 + }
  110 +
  111 + @RequestMapping(value = "/findSchByLpName")
  112 + public List<ScheduleRealInfo> findSchByLpName(@RequestParam String lpName){
  113 + return dayOfSchedule.getLpScheduleMap().get(lpName);
  114 + }
  115 +
  116 + @RequestMapping(value = "/findSchByNbbm")
  117 + public List<ScheduleRealInfo> findSchByNbbm(@RequestParam String nbbm){
  118 + return dayOfSchedule.findByNbbm(nbbm);
  119 + }
  120 +
  121 + @RequestMapping(value = "/removeExecPlan")
  122 + public int removeExecPlan(@RequestParam String nbbm){
  123 + dayOfSchedule.removeExecPlan(nbbm);
  124 + return 1;
  125 + }
  126 +
  127 + @RequestMapping(value = "/sch_re_calc_id_maps")
  128 + public int reCalcIdMaps(){
  129 + return dayOfSchedule.reCalcIdMaps();
  130 + }
  131 +
  132 + @RequestMapping(value = "/sch_size_string")
  133 + public String schSizeString(){
  134 + return dayOfSchedule.sizeString();
  135 + }
  136 +
  137 + @RequestMapping(value = "/containerSize")
  138 + public Map<String, Integer> containerSize(){
  139 + Map<String, Integer> rs = new HashMap<>();
  140 + rs.put("60_size", dayOfDirectives.all60().size());
  141 + rs.put("80_size", pilotReport.findAll().size());
  142 + rs.put("nbbm_sch_size", dayOfSchedule.findAll().size());
  143 + rs.put("lp_sch_size", dayOfSchedule.findAllByLpContainer().size());
  144 + rs.put("id_sch_size", dayOfSchedule.findAllByIdContainer().size());
  145 + rs.put("pst_sch_size", dayOfSchedule.getPstSize());
  146 + rs.put("speeds_size", OverspeedProcess.size());
  147 + rs.put("60_pst_size", DayOfDirectives.pstSize());
  148 + return rs;
  149 + }
  150 +
  151 + @RequestMapping(value = "/websocketRadioText")
  152 + public int radioText(String t, String lineCode){
  153 + sendUtils.sendRadioText(t, lineCode);
  154 + return 0;
  155 + }
  156 +
  157 + @Autowired
  158 + DirectivesPstThread directivesPstThread;
  159 +
  160 + @RequestMapping(value = "/_sd_60_pst")
  161 + public void sd_60_pst(){
  162 + logger.info("手动入库指令....");
  163 + directivesPstThread.run();
  164 + }
  165 +}
155 \ No newline at end of file 166 \ No newline at end of file
src/main/java/com/bsth/controller/realcontrol/ServiceDataInterface.java
@@ -194,15 +194,10 @@ public class ServiceDataInterface { @@ -194,15 +194,10 @@ public class ServiceDataInterface {
194 int code = GatewayHttpUtils.postJson(JSON.toJSONString(d60)); 194 int code = GatewayHttpUtils.postJson(JSON.toJSONString(d60));
195 d60.setHttpCode(code); 195 d60.setHttpCode(code);
196 196
197 - if (code == 0) {  
198 - // 添加到缓存  
199 - dayOfDirectives.put60(d60, true);  
200 - } else { 197 + if (code != 0)
201 d60.setErrorText("网关通讯失败, code: " + code); 198 d60.setErrorText("网关通讯失败, code: " + code);
202 - d60Repository.save(d60);  
203 - dayOfDirectives.put60(d60, false);  
204 - }  
205 199
  200 + dayOfDirectives.put60(d60);
206 return d60.getMsgId(); 201 return d60.getMsgId();
207 }catch (Exception e){ 202 }catch (Exception e){
208 logger.error("", e); 203 logger.error("", e);
src/main/java/com/bsth/data/directive/DayOfDirectives.java
@@ -35,8 +35,10 @@ public class DayOfDirectives { @@ -35,8 +35,10 @@ public class DayOfDirectives {
35 // 线路切换指令 64 35 // 线路切换指令 64
36 public static ConcurrentMap<String, D64> d64Map; 36 public static ConcurrentMap<String, D64> d64Map;
37 37
38 - //等待入库的指令 38 + //等待插入的指令
39 public static LinkedList<Directive> pstDirectives; 39 public static LinkedList<Directive> pstDirectives;
  40 + //等待更新的指令
  41 + public static LinkedList<D60> pstD60s;
40 42
41 @Autowired 43 @Autowired
42 DirectiveService directiveService; 44 DirectiveService directiveService;
@@ -54,13 +56,13 @@ public class DayOfDirectives { @@ -54,13 +56,13 @@ public class DayOfDirectives {
54 d60Map = new ConcurrentHashMap<>(); 56 d60Map = new ConcurrentHashMap<>();
55 d64Map = new ConcurrentHashMap<>(); 57 d64Map = new ConcurrentHashMap<>();
56 pstDirectives = new LinkedList<>(); 58 pstDirectives = new LinkedList<>();
  59 + pstD60s = new LinkedList<>();
57 } 60 }
58 61
59 - public void put60(D60 d60, boolean pst) { 62 + public void put60(D60 d60) {
60 d60Map.put(d60.getMsgId(), d60); 63 d60Map.put(d60.getMsgId(), d60);
61 //等待持久化 64 //等待持久化
62 - if(pst)  
63 - pstDirectives.add(d60); 65 + pstDirectives.add(d60);
64 } 66 }
65 67
66 public void put64(D64 d64) { 68 public void put64(D64 d64) {
@@ -69,6 +71,10 @@ public class DayOfDirectives { @@ -69,6 +71,10 @@ public class DayOfDirectives {
69 pstDirectives.add(d64); 71 pstDirectives.add(d64);
70 } 72 }
71 73
  74 + public static int pstSize(){
  75 + return pstDirectives.size();
  76 + }
  77 +
72 /** 78 /**
73 * 79 *
74 * @Title: reply @Description: TODO(指令 46,47 响应) @throws 80 * @Title: reply @Description: TODO(指令 46,47 响应) @throws
@@ -100,9 +106,9 @@ public class DayOfDirectives { @@ -100,9 +106,9 @@ public class DayOfDirectives {
100 d60.setReply47Time(System.currentTimeMillis()); 106 d60.setReply47Time(System.currentTimeMillis());
101 break; 107 break;
102 } 108 }
103 - // 等待持久化  
104 - if(!pstDirectives.contains(d60))  
105 - pstDirectives.add(d60); 109 +
  110 + //更新60数据
  111 + pstD60s.add(d60);
106 112
107 ScheduleRealInfo sch = d60.getSch(); 113 ScheduleRealInfo sch = d60.getSch();
108 if (null == sch) 114 if (null == sch)
@@ -133,110 +139,16 @@ public class DayOfDirectives { @@ -133,110 +139,16 @@ public class DayOfDirectives {
133 if (null == data) 139 if (null == data)
134 logger.warn("64响应 data is null ,json: " + json); 140 logger.warn("64响应 data is null ,json: " + json);
135 else { 141 else {
136 - d64.setRespAck(data.getShort("requestAck"));  
137 - // 持久化  
138 - if(!pstDirectives.contains(d64))  
139 - pstDirectives.add(d64); 142 + logger.info(d64.getDeviceId() + "_" + d64.getData().getLineId() + "响应:" + data.getShort("requestAck"));
  143 + /*d64.setRespAck(data.getShort("requestAck"));
  144 + // 持久化*/
  145 + //64 响应不入库了...
140 } 146 }
141 } 147 }
142 } 148 }
143 149
144 -/* private void saveD60(D60 d60) {  
145 - // 等47再入库  
146 - if (d60.getReply47() == null)  
147 - return;  
148 -  
149 - directiveService.save(d60);  
150 - }  
151 -  
152 - public void clear(String lineCode){  
153 - int c60 = 0, c64 = 0;  
154 -  
155 - Collection<D60> d60s = d60Map.values();  
156 - List<D60> rem60List = new ArrayList<>();  
157 - for(D60 d60 : d60s){  
158 - if(d60.getLineCode().equals(lineCode))  
159 - rem60List.add(d60);  
160 - }  
161 -  
162 - //清除60数据  
163 - for(D60 d60 : rem60List){  
164 - if(d60.getReply47() == null)  
165 - directiveService.save(d60);  
166 - if(null != d60Map.remove(d60.getMsgId()))  
167 - c60 ++;  
168 - }  
169 -  
170 - rem60List.clear();  
171 - if(c60 > 0)  
172 - logger.info("清除60数据 ," + c60);  
173 -  
174 - //找到该设备的64数据  
175 - Collection<D64> d64s = d64Map.values();  
176 - List<D64> rem64List = new ArrayList<>();  
177 - for(D64 d64 : d64s){  
178 - if(d64.get)  
179 - rem64List.add(d64);  
180 - }  
181 -  
182 - //清除64数据  
183 - for(D64 d64 : rem64List){  
184 - if(d64.getRespAck() == null)  
185 - directiveService.save64(d64);  
186 -  
187 - if(null != d64Map.remove(d64.getKey()))  
188 - c64 ++;  
189 - }  
190 -  
191 - rem64List.clear();  
192 - if(c64 > 0)  
193 - logger.info("清除64数据 ," + c64);  
194 - }*/  
195 -  
196 -/* public void clear(String device){  
197 - int c60 = 0, c64 = 0;  
198 -  
199 - Collection<D60> d60s = d60Map.values();  
200 - List<D60> rem60List = new ArrayList<>();  
201 - for(D60 d60 : d60s){  
202 - if(device.equals(d60.getDeviceId()))  
203 - rem60List.add(d60);  
204 - }  
205 -  
206 - //清除60数据  
207 - for(D60 d60 : rem60List){  
208 - if(d60.getReply47() == null)  
209 - directiveService.save(d60);  
210 - if(null != d60Map.remove(d60.getMsgId()))  
211 - c60 ++;  
212 - }  
213 -  
214 - rem60List.clear();  
215 - if(c60 > 0)  
216 - logger.info("清除60数据 ," + c60);  
217 -  
218 - //找到该设备的64数据  
219 - Collection<D64> d64s = d64Map.values();  
220 - List<D64> rem64List = new ArrayList<>();  
221 - for(D64 d64 : d64s){  
222 - if(device.equals(d64.getDeviceId()))  
223 - rem64List.add(d64);  
224 - }  
225 -  
226 - //清除64数据  
227 - for(D64 d64 : rem64List){  
228 - if(d64.getRespAck() == null)  
229 - directiveService.save64(d64);  
230 -  
231 - if(null != d64Map.remove(d64.getKey()))  
232 - c64 ++;  
233 - }  
234 -  
235 - rem64List.clear();  
236 - if(c64 > 0)  
237 - logger.info("清除64数据 ," + c64);  
238 - }*/  
239 - 150 + @Autowired
  151 + DirectivesPstThread directivesPstThread;
240 public void clearAll(){ 152 public void clearAll(){
241 d60Map = new ConcurrentHashMap<>(); 153 d60Map = new ConcurrentHashMap<>();
242 d64Map = new ConcurrentHashMap<>(); 154 d64Map = new ConcurrentHashMap<>();
src/main/java/com/bsth/data/directive/DirectivesPstThread.java
1 package com.bsth.data.directive; 1 package com.bsth.data.directive;
2 2
  3 +import com.alibaba.fastjson.JSON;
3 import com.bsth.data.schedule.DayOfSchedule; 4 import com.bsth.data.schedule.DayOfSchedule;
4 import com.bsth.entity.directive.D60; 5 import com.bsth.entity.directive.D60;
5 import com.bsth.entity.directive.D64; 6 import com.bsth.entity.directive.D64;
6 import com.bsth.entity.directive.Directive; 7 import com.bsth.entity.directive.Directive;
7 -import com.bsth.entity.realcontrol.ScheduleRealInfo;  
8 import com.bsth.repository.directive.D60Repository; 8 import com.bsth.repository.directive.D60Repository;
9 import com.bsth.repository.directive.D64Repository; 9 import com.bsth.repository.directive.D64Repository;
10 import org.joda.time.format.DateTimeFormat; 10 import org.joda.time.format.DateTimeFormat;
@@ -12,9 +12,20 @@ import org.joda.time.format.DateTimeFormatter; @@ -12,9 +12,20 @@ import org.joda.time.format.DateTimeFormatter;
12 import org.slf4j.Logger; 12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory; 13 import org.slf4j.LoggerFactory;
14 import org.springframework.beans.factory.annotation.Autowired; 14 import org.springframework.beans.factory.annotation.Autowired;
  15 +import org.springframework.jdbc.core.BatchPreparedStatementSetter;
  16 +import org.springframework.jdbc.core.JdbcTemplate;
  17 +import org.springframework.jdbc.datasource.DataSourceTransactionManager;
15 import org.springframework.stereotype.Component; 18 import org.springframework.stereotype.Component;
  19 +import org.springframework.transaction.TransactionDefinition;
  20 +import org.springframework.transaction.TransactionStatus;
  21 +import org.springframework.transaction.support.DefaultTransactionDefinition;
16 22
  23 +import java.sql.PreparedStatement;
  24 +import java.sql.SQLException;
  25 +import java.sql.Types;
  26 +import java.util.ArrayList;
17 import java.util.LinkedList; 27 import java.util.LinkedList;
  28 +import java.util.List;
18 29
19 /** 30 /**
20 * 指令持久化线程 31 * 指令持久化线程
@@ -34,40 +45,259 @@ public class DirectivesPstThread extends Thread { @@ -34,40 +45,259 @@ public class DirectivesPstThread extends Thread {
34 @Autowired 45 @Autowired
35 DayOfSchedule dayOfSchedule; 46 DayOfSchedule dayOfSchedule;
36 47
  48 + @Autowired
  49 + JdbcTemplate jdbcTemplate;
  50 +
37 private static DateTimeFormatter fmtyyyyMMdd = DateTimeFormat.forPattern("yyyy-MM-dd"); 51 private static DateTimeFormatter fmtyyyyMMdd = DateTimeFormat.forPattern("yyyy-MM-dd");
38 52
39 @Override 53 @Override
40 public void run() { 54 public void run() {
  55 + try{
  56 + LinkedList<Directive> list = DayOfDirectives.pstDirectives;
  57 + logger.info("开始指令入库: " + list.size());
41 58
42 - LinkedList<Directive> list = DayOfDirectives.pstDirectives;  
43 -  
44 - Directive directive;  
45 - for (int i = 0; i < 1000; i++) {  
46 - try { 59 + List<D60> d60s = new ArrayList<>();
  60 + List<D64> d64s = new ArrayList<>();
  61 + //按 60 和 64 分组
  62 + Directive directive;
  63 + D60 d60;
  64 + for (int i = 0; i < 2000; i++) {
47 directive = list.poll(); 65 directive = list.poll();
48 if(null == directive) 66 if(null == directive)
49 break; 67 break;
  68 +
50 //日期 69 //日期
51 directive.setRq(fmtyyyyMMdd.print(directive.getTimestamp())); 70 directive.setRq(fmtyyyyMMdd.print(directive.getTimestamp()));
  71 +
52 if (directive instanceof D60) { 72 if (directive instanceof D60) {
53 - D60 d60 = (D60) directive;  
54 - if(d60.isDispatch()){  
55 - ScheduleRealInfo sch = d60.getSch();  
56 - //如果关联的班次已经不存在了,放弃入库  
57 - if(sch.isDeleted()){  
58 - logger.warn("save 指令,发现 deleted=true 的班次,id=" + sch.getId());  
59 - continue;  
60 - }  
61 - }  
62 - d60Repository.save(d60); 73 + d60 = (D60) directive;
  74 + if(isDelete(d60))
  75 + continue;
  76 + d60s.add(d60);
  77 + }
  78 + else if(directive instanceof D64)
  79 + d64s.add((D64) directive);
  80 + }
  81 +
  82 + //入库60
  83 + save60(d60s);
  84 + //入库64
  85 + save64(d64s);
  86 +
  87 +
  88 + // 60 指令更新(车载响应)
  89 + LinkedList<D60> updateD60s = DayOfDirectives.pstD60s;
  90 + d60s = new ArrayList<>();
  91 + for (int i = 0; i < 2000; i++) {
  92 + d60 = updateD60s.poll();
  93 + if(null == d60)
  94 + break;
  95 + d60s.add(d60);
  96 + }
  97 +
  98 + if(d60s.size() > 0)
  99 + update60(d60s);
  100 + }catch (Exception e){
  101 + logger.error("指令入库出现异常", e);
  102 + }
  103 + }
  104 +
  105 + private void save64(final List<D64> d64s) {
  106 + if(null == d64s || d64s.size() == 0)
  107 + return;
  108 +
  109 + String sql = "insert into bsth_v_directive_64(device_id,error_text,http_code,oper_code,rq,sender,timestamp,city_code,line_id,txt_content,resp_ack) " +
  110 + " values(?,?,?,?,?,?,?,?,?,?,?)";
  111 +
  112 + //编程式事务
  113 + DataSourceTransactionManager tran = new DataSourceTransactionManager(jdbcTemplate.getDataSource());
  114 + DefaultTransactionDefinition def = new DefaultTransactionDefinition();
  115 + def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
  116 + TransactionStatus status = tran.getTransaction(def);
  117 +
  118 + try{
  119 + jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
  120 + @Override
  121 + public void setValues(PreparedStatement ps, int i) throws SQLException {
  122 + D64 d64 = d64s.get(i);
  123 + ps.setString(1 , d64.getDeviceId());
  124 + ps.setString(2, isNvl(d64.getErrorText()));
  125 + ps.setInt(3, d64.getHttpCode());
  126 + ps.setShort(4, isNvl(d64.getOperCode()));
  127 + ps.setString(5, d64.getRq());
  128 +
  129 + ps.setString(6, isNvl(d64.getSender()));
  130 + ps.setLong(7, d64.getTimestamp());
  131 +
  132 + ps.setShort(8, isNvl(d64.getData().getCityCode()));
  133 + ps.setString(9, isNvl(d64.getData().getLineId()));
  134 + ps.setString(10, isNvl(d64.getData().getTxtContent()));
  135 + ps.setShort(11, isNvl(d64.getRespAck()));
  136 + }
  137 +
  138 + @Override
  139 + public int getBatchSize() {
  140 + return d64s.size();
63 } 141 }
  142 + });
  143 +
  144 + tran.commit(status);
  145 +
  146 + logger.info("64 入库成功: " + d64s.size());
  147 + }catch (Exception e){
  148 + tran.rollback(status);
  149 + logger.error("", e);
  150 + logger.warn("失败的数据:" + JSON.toJSONString(d64s));
  151 + }
  152 + }
  153 +
  154 + private void update60(final List<D60> d60s) {
  155 + if(null == d60s || d60s.size() == 0)
  156 + return;
  157 +
  158 + String sql = "update bsth_v_directive_60 set reply46=?,reply46time=?,reply47=?,reply47time=? where device_id=? and timestamp=? and msg_id=?";
  159 +
  160 + //编程式事务
  161 + DataSourceTransactionManager tran = new DataSourceTransactionManager(jdbcTemplate.getDataSource());
  162 + DefaultTransactionDefinition def = new DefaultTransactionDefinition();
  163 + def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
  164 + TransactionStatus status = tran.getTransaction(def);
  165 +
  166 + try{
  167 + jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
  168 + @Override
  169 + public void setValues(PreparedStatement ps, int i) throws SQLException {
  170 + D60 d60 = d60s.get(i);
  171 + ps.setShort(1, isNvl(d60.getReply46()));
  172 + if(null == d60.getReply46Time())
  173 + ps.setNull(2, Types.BIGINT);
  174 + else
  175 + ps.setLong(2, d60.getReply46Time());
  176 +
  177 + ps.setShort(3, isNvl(d60.getReply47()));
64 178
65 - if (directive instanceof D64) {  
66 - d64Repository.save((D64) directive); 179 + if(null == d60.getReply47Time())
  180 + ps.setNull(4, Types.BIGINT);
  181 + else
  182 + ps.setLong(4, d60.getReply47Time());
  183 + ps.setString(5, d60.getDeviceId());
  184 + ps.setLong(6, d60.getTimestamp());
  185 + ps.setInt(7, d60.getMsgId());
67 } 186 }
68 - } catch (Exception e) {  
69 - logger.error("", e); 187 +
  188 + @Override
  189 + public int getBatchSize() {
  190 + return d60s.size();
  191 + }
  192 + });
  193 +
  194 + tran.commit(status);
  195 +
  196 + logger.info("60 更新成功: " + d60s.size());
  197 + }catch (Exception e){
  198 + tran.rollback(status);
  199 + logger.error("", e);
  200 + logger.warn("失败的数据:" + JSON.toJSONString(d60s));
  201 + }
  202 + }
  203 +
  204 + private void save60(final List<D60> d60s) {
  205 + if(null == d60s || d60s.size() == 0)
  206 + return;
  207 +
  208 + String sql = "insert into bsth_v_directive_60(device_id,error_text,http_code,oper_code,rq,sender,timestamp" +
  209 + ",alarm_time,company_code,dispatch_instruct,instruct_type,msg_id,service_state,txt_content,is_dispatch" +
  210 + ",line_code,reply46,reply46time,reply47,reply47time,sch) " +
  211 + " values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
  212 +
  213 + //编程式事务
  214 + DataSourceTransactionManager tran = new DataSourceTransactionManager(jdbcTemplate.getDataSource());
  215 + DefaultTransactionDefinition def = new DefaultTransactionDefinition();
  216 + def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
  217 + TransactionStatus status = tran.getTransaction(def);
  218 +
  219 + try{
  220 + jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
  221 + @Override
  222 + public void setValues(PreparedStatement ps, int i) throws SQLException {
  223 + D60 d60 = d60s.get(i);
  224 + ps.setString(1, d60.getDeviceId());
  225 + ps.setString(2, isNvl(d60.getErrorText()));
  226 + ps.setInt(3, d60.getHttpCode());
  227 + ps.setShort(4, d60.getOperCode());
  228 + ps.setString(5, d60.getRq());
  229 + ps.setString(6, d60.getSender());
  230 + ps.setLong(7, d60.getTimestamp());
  231 +
  232 + ps.setLong(8, isNvl(d60.getData().getAlarmTime()));
  233 + ps.setShort(9, isNvl(d60.getData().getCompanyCode()));
  234 + ps.setShort(10, isNvl(d60.getData().getDispatchInstruct()));
  235 + ps.setInt(11, d60.getData().getInstructType());
  236 + ps.setInt(12, d60.getData().getMsgId());
  237 + ps.setLong(13, d60.getData().getServiceState());
  238 + ps.setString(14, d60.getData().getTxtContent());
  239 + ps.setBoolean(15, d60.isDispatch());
  240 +
  241 + ps.setString(16, isNvl(d60.getLineCode()));
  242 + ps.setShort(17, isNvl(d60.getReply46()));
  243 +
  244 + if(null == d60.getReply46Time())
  245 + ps.setNull(18, Types.BIGINT);
  246 + else
  247 + ps.setLong(18, d60.getReply46Time());
  248 +
  249 + ps.setShort(19, isNvl(d60.getReply47()));
  250 +
  251 + if(null == d60.getReply47Time())
  252 + ps.setNull(20, Types.BIGINT);
  253 + else
  254 + ps.setLong(20, d60.getReply47Time());
  255 +
  256 + if(d60.getSch()==null)
  257 + ps.setNull(21, Types.BIGINT);
  258 + else
  259 + ps.setLong(21, d60.getSch().getId());
  260 + }
  261 +
  262 + @Override
  263 + public int getBatchSize() {
  264 + return d60s.size();
  265 + }
  266 + });
  267 +
  268 + tran.commit(status);
  269 +
  270 + logger.info("60 入库成功: " + d60s.size());
  271 + }catch (Exception e){
  272 + tran.rollback(status);
  273 + logger.error("", e);
  274 + logger.warn("失败的数据:" + JSON.toJSONString(d60s));
  275 + }
  276 + }
  277 +
  278 + private String isNvl(String v) {
  279 + return v==null?"":v;
  280 + }
  281 +
  282 + private short isNvl(Short v) {
  283 + return v==null?0:v;
  284 + }
  285 +
  286 + private long isNvl(Long v) {
  287 + return v==null?0:v;
  288 + }
  289 +
  290 + private boolean isDelete(D60 d60){
  291 + try{
  292 + //如果关联的班次已经不存在了,放弃入库,很低概率出现
  293 + if(d60.isDispatch() && d60.getSch().isDeleted()){
  294 + logger.warn("save 指令,发现 deleted=true 的班次,id=" + d60.getSch().getId());
  295 + return true;
70 } 296 }
  297 + }catch (Exception e){
  298 + logger.error("", e);
71 } 299 }
  300 +
  301 + return false;
72 } 302 }
73 } 303 }
src/main/java/com/bsth/data/gpsdata_v2/GpsRealData.java
@@ -124,6 +124,8 @@ public class GpsRealData { @@ -124,6 +124,8 @@ public class GpsRealData {
124 */ 124 */
125 public List<GpsEntity> getByLine(String lineCode) { 125 public List<GpsEntity> getByLine(String lineCode) {
126 NavigableSet<String> set = lineCode2Devices.get(lineCode);//实际车载 126 NavigableSet<String> set = lineCode2Devices.get(lineCode);//实际车载
  127 + if(null == set)
  128 + set = new TreeSet();
127 Set<String> nbbmSet = dayOfSchedule.findCarByLineCode(lineCode);//计划用车 129 Set<String> nbbmSet = dayOfSchedule.findCarByLineCode(lineCode);//计划用车
128 130
129 Map<String, String> nbbm2deviceMap = BasicData.deviceId2NbbmMap.inverse(); 131 Map<String, String> nbbm2deviceMap = BasicData.deviceId2NbbmMap.inverse();
src/main/java/com/bsth/data/schedule/thread/CalcOilThread.java
@@ -2,6 +2,7 @@ package com.bsth.data.schedule.thread; @@ -2,6 +2,7 @@ package com.bsth.data.schedule.thread;
2 2
3 import com.bsth.data.directive.DayOfDirectives; 3 import com.bsth.data.directive.DayOfDirectives;
4 import com.bsth.data.gpsdata_v2.handlers.overspeed.OverspeedProcess; 4 import com.bsth.data.gpsdata_v2.handlers.overspeed.OverspeedProcess;
  5 +import com.bsth.data.gpsdata_v2.thread.GpsDataLoaderThread;
5 import com.bsth.service.oil.DlbService; 6 import com.bsth.service.oil.DlbService;
6 import com.bsth.data.safe_driv.SafeDrivCenter; 7 import com.bsth.data.safe_driv.SafeDrivCenter;
7 import com.bsth.service.oil.YlbService; 8 import com.bsth.service.oil.YlbService;
@@ -51,5 +52,8 @@ public class CalcOilThread extends Thread{ @@ -51,5 +52,8 @@ public class CalcOilThread extends Thread{
51 SafeDrivCenter.clear(); 52 SafeDrivCenter.clear();
52 //清除超速缓存数据 53 //清除超速缓存数据
53 OverspeedProcess.clear(); 54 OverspeedProcess.clear();
  55 +
  56 + GpsDataLoaderThread.setFlag(0);
  57 +
54 } 58 }
55 } 59 }
src/main/java/com/bsth/filter/AccessLogFilter.java
@@ -46,6 +46,7 @@ public class AccessLogFilter extends BaseFilter { @@ -46,6 +46,7 @@ public class AccessLogFilter extends BaseFilter {
46 String url = request.getRequestURI(); 46 String url = request.getRequestURI();
47 String params = getParams(request); 47 String params = getParams(request);
48 String headers = getHeaders(request); 48 String headers = getHeaders(request);
  49 + String method = request.getMethod();
49 50
50 StringBuilder s = new StringBuilder(); 51 StringBuilder s = new StringBuilder();
51 s.append(getBlock(username + " -" + name)); 52 s.append(getBlock(username + " -" + name));
@@ -53,6 +54,7 @@ public class AccessLogFilter extends BaseFilter { @@ -53,6 +54,7 @@ public class AccessLogFilter extends BaseFilter {
53 s.append(getBlock(ip)); 54 s.append(getBlock(ip));
54 s.append(getBlock(userAgent)); 55 s.append(getBlock(userAgent));
55 s.append(getBlock(url)); 56 s.append(getBlock(url));
  57 + s.append(getBlock(method));
56 s.append(getBlock(params)); 58 s.append(getBlock(params));
57 s.append(getBlock(headers)); 59 s.append(getBlock(headers));
58 s.append(getBlock(request.getHeader("Referer"))); 60 s.append(getBlock(request.getHeader("Referer")));
src/main/java/com/bsth/service/directive/DirectiveServiceImpl.java
@@ -96,14 +96,10 @@ public class DirectiveServiceImpl extends BaseServiceImpl&lt;D60, Integer&gt; implemen @@ -96,14 +96,10 @@ public class DirectiveServiceImpl extends BaseServiceImpl&lt;D60, Integer&gt; implemen
96 d60.setSender(sender); 96 d60.setSender(sender);
97 d60.setHttpCode(code); 97 d60.setHttpCode(code);
98 98
99 - if (code == 0) {  
100 - // 添加到缓存  
101 - dayOfDirectives.put60(d60, true);  
102 - } else { 99 + if (code != 0)
103 d60.setErrorText("网关通讯失败, code: " + code); 100 d60.setErrorText("网关通讯失败, code: " + code);
104 - d60Repository.save(d60);  
105 - dayOfDirectives.put60(d60, false);  
106 - } 101 +
  102 + dayOfDirectives.put60(d60);
107 return code; 103 return code;
108 } 104 }
109 105
@@ -171,16 +167,13 @@ public class DirectiveServiceImpl extends BaseServiceImpl&lt;D60, Integer&gt; implemen @@ -171,16 +167,13 @@ public class DirectiveServiceImpl extends BaseServiceImpl&lt;D60, Integer&gt; implemen
171 167
172 if (code == 0) { 168 if (code == 0) {
173 sch.setDirectiveState(60); 169 sch.setDirectiveState(60);
174 - // 添加到缓存,延迟入库  
175 - dayOfDirectives.put60(d60, true);  
176 // 通知页面 170 // 通知页面
177 sendD60ToPage(sch); 171 sendD60ToPage(sch);
178 } else { 172 } else {
179 d60.setErrorText("网关通讯失败, code: " + code); 173 d60.setErrorText("网关通讯失败, code: " + code);
180 - dayOfDirectives.put60(d60, false);  
181 - d60Repository.save(d60);  
182 } 174 }
183 175
  176 + dayOfDirectives.put60(d60);
184 return code; 177 return code;
185 } 178 }
186 179
@@ -227,18 +220,15 @@ public class DirectiveServiceImpl extends BaseServiceImpl&lt;D60, Integer&gt; implemen @@ -227,18 +220,15 @@ public class DirectiveServiceImpl extends BaseServiceImpl&lt;D60, Integer&gt; implemen
227 int code = GatewayHttpUtils.postJson(JSON.toJSONString(d60)); 220 int code = GatewayHttpUtils.postJson(JSON.toJSONString(d60));
228 // 添加到缓存,等待入库 221 // 添加到缓存,等待入库
229 d60.setHttpCode(code); 222 d60.setHttpCode(code);
230 - /*if (null != sch){  
231 - d60.setSch(sch);  
232 - d60.setLineCode(sch.getXlBm());  
233 - }*/  
234 223
235 - if (code == 0) {  
236 - dayOfDirectives.put60(d60, true);  
237 - } else { 224 + GpsEntity gps = gpsRealDataBuffer.getByNbbm(nbbm);
  225 + if(null != gps)
  226 + d60.setLineCode(gps.getLineId());
  227 +
  228 + if (code != 0)
238 d60.setErrorText("网关通讯失败, code: " + code); 229 d60.setErrorText("网关通讯失败, code: " + code);
239 - d60Repository.save(d60);  
240 - dayOfDirectives.put60(d60, false);  
241 - } 230 +
  231 + dayOfDirectives.put60(d60);
242 return code; 232 return code;
243 } 233 }
244 234
@@ -268,15 +258,13 @@ public class DirectiveServiceImpl extends BaseServiceImpl&lt;D60, Integer&gt; implemen @@ -268,15 +258,13 @@ public class DirectiveServiceImpl extends BaseServiceImpl&lt;D60, Integer&gt; implemen
268 // 入库 258 // 入库
269 d64.setHttpCode(code); 259 d64.setHttpCode(code);
270 d64.getData().setTxtContent("切换线路[" + BasicData.lineCode2NameMap.get(lineCode) + "]"); 260 d64.getData().setTxtContent("切换线路[" + BasicData.lineCode2NameMap.get(lineCode) + "]");
271 - dayOfDirectives.put64(d64);  
272 -  
273 // 通知设备刷新线路文件,忽略结果 261 // 通知设备刷新线路文件,忽略结果
274 if (code == 0) 262 if (code == 0)
275 GatewayHttpUtils.postJson(crt.createDeviceRefreshData(deviceId, lineCode)); 263 GatewayHttpUtils.postJson(crt.createDeviceRefreshData(deviceId, lineCode));
276 else 264 else
277 d64.setErrorText(" 网关通讯失败, code: " + code); 265 d64.setErrorText(" 网关通讯失败, code: " + code);
278 266
279 - d64Repository.save(d64); 267 + dayOfDirectives.put64(d64);
280 return code; 268 return code;
281 } 269 }
282 270