Commit 1267a7c622a01793e5a5107bd9de3ee0275aa06b

Authored by 潘钊
1 parent 9a573551

update...

src/main/java/com/bsth/data/ThreadMonotor.java 0 → 100644
  1 +package com.bsth.data;
  2 +
  3 +import com.bsth.data.gpsdata.arrival.GpsRealAnalyse;
  4 +import com.bsth.data.msg_queue.DirectivePushQueue;
  5 +import com.bsth.data.msg_queue.WebSocketPushQueue;
  6 +import org.slf4j.Logger;
  7 +import org.slf4j.LoggerFactory;
  8 +import org.springframework.stereotype.Component;
  9 +
  10 +/**
  11 + * Created by panzhao on 2017/5/11.
  12 + */
  13 +@Component
  14 +public class ThreadMonotor extends Thread{
  15 +
  16 + Logger log = LoggerFactory.getLogger(this.getClass());
  17 +
  18 + @Override
  19 + public void run() {
  20 +
  21 + //线调GPS分析线程
  22 + if(GpsRealAnalyse.isBlock()){
  23 + log.warn("GpsRealAnalyse isBlock true !!!!");
  24 + GpsRealAnalyse.shutdown();
  25 + }
  26 +
  27 + //webSocket 消息推送队列
  28 + if(WebSocketPushQueue.isIdle()){
  29 + log.warn("WebSocketPushQueue isIdle true !!!!");
  30 + WebSocketPushQueue.start();
  31 + }
  32 +
  33 + //系统自动发送的网关指令 推送队列
  34 + if(DirectivePushQueue.isIdle()){
  35 + log.warn("DirectivePushQueue isIdle true !!!!");
  36 + DirectivePushQueue.start();
  37 + }
  38 + }
  39 +}
... ...
src/main/java/com/bsth/data/directive/DayOfDirectives.java
... ... @@ -57,10 +57,11 @@ public class DayOfDirectives {
57 57 pstDirectives = new LinkedList<>();
58 58 }
59 59  
60   - public void put60(D60 d60) {
  60 + public void put60(D60 d60, boolean pst) {
61 61 d60Map.put(d60.getMsgId(), d60);
62 62 //等待持久化
63   - pstDirectives.add(d60);
  63 + if(pst)
  64 + pstDirectives.add(d60);
64 65 }
65 66  
66 67 public void put64(D64 d64) {
... ...
src/main/java/com/bsth/data/directive/GatewayHttpUtils.java
... ... @@ -41,8 +41,8 @@ public class GatewayHttpUtils {
41 41  
42 42 //超时时间
43 43 RequestConfig requestConfig = RequestConfig.custom()
44   - .setConnectTimeout(3000).setConnectionRequestTimeout(1000)
45   - .setSocketTimeout(3000).build();
  44 + .setConnectTimeout(1500).setConnectionRequestTimeout(1000)
  45 + .setSocketTimeout(1500).build();
46 46  
47 47 HttpPost post = new HttpPost(url);
48 48  
... ...
src/main/java/com/bsth/data/gpsdata/GpsRealData.java
... ... @@ -4,7 +4,6 @@ import com.bsth.data.BasicData;
4 4 import com.bsth.data.forecast.ForecastRealServer;
5 5 import com.bsth.data.gpsdata.thread.GpsDataLoaderThread;
6 6 import com.bsth.data.gpsdata.thread.OfflineMonitorThread;
7   -import com.bsth.data.gpsdata.thread.ThreadPollMonitor;
8 7 import com.bsth.data.schedule.DayOfSchedule;
9 8 import com.bsth.entity.realcontrol.ScheduleRealInfo;
10 9 import com.google.common.collect.TreeMultimap;
... ... @@ -47,9 +46,6 @@ public class GpsRealData implements CommandLineRunner {
47 46 @Autowired
48 47 ForecastRealServer forecastRealServer;
49 48  
50   - @Autowired
51   - ThreadPollMonitor threadPollMonitor;
52   -
53 49 /**
54 50 * 构造函数
55 51 */
... ... @@ -60,15 +56,11 @@ public class GpsRealData implements CommandLineRunner {
60 56  
61 57 @Override
62 58 public void run(String... arg0) throws Exception {
63   - logger.info("gpsDataLoader,20,3");
  59 + logger.info("gpsDataLoader,30,2");
64 60 //http形式获取GPS数据
65   - //Application.mainServices.scheduleWithFixedDelay(gpsDataLoader, 20, 2, TimeUnit.SECONDS);
  61 + //Application.mainServices.scheduleWithFixedDelay(gpsDataLoader, 30, 2, TimeUnit.SECONDS);
66 62 //定时扫描掉离线
67 63 //Application.mainServices.scheduleWithFixedDelay(offlineMonitorThread, 120, 60, TimeUnit.SECONDS);
68   -
69   - //扫描GPS线程池状态
70   - //Application.mainServices.scheduleWithFixedDelay(threadPollMonitor, 60, 20, TimeUnit.SECONDS);
71   -
72 64 }
73 65  
74 66  
... ... @@ -89,19 +81,29 @@ public class GpsRealData implements CommandLineRunner {
89 81 gps.setExpectStopTime(forecastRealServer.expectStopTime(gps.getNbbm()));
90 82 }
91 83 }
92   - } catch (Exception e) {
93   - logger.error("", e);
94   - }
95 84  
96   - //刷新对照
97   - gpsMap.put(device, gps);
98   - if (StringUtils.isNotBlank(gps.getLineId())) {
99   - //站点名称
100   - gps.setStationName(getStationName(gps));
101   - lineCode2Devices.put(gps.getLineId(), device);
  85 + //刷新对照
  86 + gpsMap.put(device, gps);
  87 + if (StringUtils.isNotBlank(gps.getLineId())) {
  88 + //站点名称
  89 + gps.setStationName(getStationName(gps));
  90 + lineCode2Devices.put(gps.getLineId(), device);
  91 +
  92 + if(old != null && !gps.getLineId().equals(old.getLineId()))
  93 + lineCode2Devices.remove(old.getLineId(), device);
  94 + }
102 95  
103   - if(old != null && !gps.getLineId().equals(old.getLineId()))
104   - lineCode2Devices.remove(old.getLineId(), device);
  96 + //车辆换设备了
  97 + String nbbm = gps.getNbbm();
  98 + if(old != null && StringUtils.isNotEmpty(nbbm) && !nbbm.equals(old.getNbbm())){
  99 + List<GpsEntity> list = findByNbbm(nbbm);
  100 + for(GpsEntity g : list){
  101 + if(!g.getDeviceId().equals(device))
  102 + gpsMap.remove(g.getDeviceId());
  103 + }
  104 + }
  105 + } catch (Exception e) {
  106 + logger.error("", e);
105 107 }
106 108 }
107 109  
... ... @@ -116,6 +118,16 @@ public class GpsRealData implements CommandLineRunner {
116 118 return gpsMap.get(deviceId);
117 119 }
118 120  
  121 + public List<GpsEntity> findByNbbm(String nbbm){
  122 + Collection<GpsEntity> arr = gpsMap.values();
  123 + List<GpsEntity> rs = new ArrayList<>();
  124 + for(GpsEntity g : arr){
  125 + if(nbbm.equals(g.getNbbm()))
  126 + rs.add(g);
  127 + }
  128 + return rs;
  129 + }
  130 +
119 131 /**
120 132 * @Title: get @Description: TODO(线路编码获取GPS集合) @throws
121 133 */
... ...
src/main/java/com/bsth/data/gpsdata/arrival/GpsRealAnalyse.java
... ... @@ -11,10 +11,7 @@ import org.slf4j.LoggerFactory;
11 11 import org.springframework.beans.factory.annotation.Autowired;
12 12 import org.springframework.stereotype.Component;
13 13  
14   -import java.util.Collections;
15   -import java.util.Comparator;
16   -import java.util.List;
17   -import java.util.Set;
  14 +import java.util.*;
18 15 import java.util.concurrent.CountDownLatch;
19 16 import java.util.concurrent.ExecutorService;
20 17 import java.util.concurrent.Executors;
... ... @@ -44,15 +41,18 @@ public class GpsRealAnalyse {
44 41 @Autowired
45 42 GpsRealData gpsRealData;
46 43  
47   - static ExecutorService threadPool = Executors.newFixedThreadPool(100);
  44 + static ExecutorService threadPool = Executors.newFixedThreadPool(20);
48 45  
49 46 public static long st;
50 47 public static CountDownLatch count;
51 48  
  49 + public static boolean isBlock() {
  50 + return System.currentTimeMillis() - st > 1000 * 20;
  51 + }
  52 +
52 53 public void analyse(List<GpsEntity> list) {
53 54 try {
54 55 st = System.currentTimeMillis();
55   - //如果正在恢复数据
56 56 if (GpsDataRecovery.run)
57 57 return;
58 58  
... ... @@ -63,6 +63,7 @@ public class GpsRealAnalyse {
63 63 }
64 64  
65 65 Set<String> ks = multimap.keySet();
  66 +
66 67 logger.info("analyse gps size: " + list.size() + ", ks: " + ks.size());
67 68 count = new CountDownLatch(ks.size());
68 69  
... ... @@ -125,7 +126,7 @@ public class GpsRealAnalyse {
125 126 abnormalStateHandle.handle(gps, prevs);
126 127  
127 128 if (!task)
128   - return; //无任务的,到这里就结束
  129 + continue; //无任务的,到这里就结束
129 130  
130 131 //反向处理
131 132 reverseSignalHandle.handle(gps, prevs);
... ...
src/main/java/com/bsth/data/gpsdata/arrival/handlers/InOutStationSignalHandle.java
... ... @@ -6,12 +6,12 @@ import com.bsth.data.gpsdata.arrival.SignalHandle;
6 6 import com.bsth.data.gpsdata.arrival.utils.CircleQueue;
7 7 import com.bsth.data.gpsdata.arrival.utils.ScheduleSignalState;
8 8 import com.bsth.data.gpsdata.arrival.utils.SignalSchPlanMatcher;
  9 +import com.bsth.data.msg_queue.DirectivePushQueue;
9 10 import com.bsth.data.schedule.DayOfSchedule;
10 11 import com.bsth.data.schedule.ScheduleComparator;
11 12 import com.bsth.data.schedule.late_adjust.LateAdjustHandle;
12 13 import com.bsth.entity.realcontrol.LineConfig;
13 14 import com.bsth.entity.realcontrol.ScheduleRealInfo;
14   -import com.bsth.service.directive.DirectiveService;
15 15 import com.bsth.websocket.handler.SendUtils;
16 16 import org.apache.commons.lang3.StringUtils;
17 17 import org.slf4j.Logger;
... ... @@ -41,9 +41,6 @@ public class InOutStationSignalHandle extends SignalHandle{
41 41 SendUtils sendUtils;
42 42  
43 43 @Autowired
44   - DirectiveService directiveService;
45   -
46   - @Autowired
47 44 ScheduleSignalState scheduleSignalState;
48 45  
49 46 @Autowired
... ... @@ -151,7 +148,8 @@ public class InOutStationSignalHandle extends SignalHandle{
151 148  
152 149 if(sch.getBcType().equals("out")){
153 150 //出场时,切换成营运状态
154   - directiveService.send60Operation(sch.getClZbh(), 0, Integer.parseInt(sch.getXlDir()), null, "出场@系统");
  151 + DirectivePushQueue.put6003(sch.getClZbh(), 0, Integer.parseInt(sch.getXlDir()), null, "出场@系统");
  152 + //directiveService.send60Operation(sch.getClZbh(), 0, Integer.parseInt(sch.getXlDir()), null, "出场@系统");
155 153 }
156 154 //出站既出场
157 155 outStationAndOutPark(sch);
... ... @@ -208,7 +206,8 @@ public class InOutStationSignalHandle extends SignalHandle{
208 206  
209 207 if(schPrev.getBcType().equals("out")){
210 208 //出场时,切换成营运状态
211   - directiveService.send60Operation(schPrev.getClZbh(), 0, Integer.parseInt(schPrev.getXlDir()), null, "出场@系统");
  209 + DirectivePushQueue.put6003(schPrev.getClZbh(), 0, Integer.parseInt(schPrev.getXlDir()), null, "出场@系统");
  210 + //directiveService.send60Operation(schPrev.getClZbh(), 0, Integer.parseInt(schPrev.getXlDir()), null, "出场@系统");
212 211 }
213 212 }
214 213 }
... ... @@ -276,15 +275,19 @@ public class InOutStationSignalHandle extends SignalHandle{
276 275 //将gps转换为下一个班次走向的站内信号
277 276 transformUpdown(gps, next);
278 277 //下发调度指令
279   - directiveService.send60Dispatch(next, doneSum, "到站@系统");
  278 + DirectivePushQueue.put6002(next, doneSum, "到站@系统");
  279 + //directiveService.send60Dispatch(next, doneSum, "到站@系统");
280 280  
281 281 //套跑 -下发线路切换指令
282   - if(!next.getXlBm().equals(sch.getXlBm()))
283   - directiveService.lineChange(next.getClZbh(), next.getXlBm(), "套跑@系统");
  282 + if(!next.getXlBm().equals(sch.getXlBm())){
  283 + DirectivePushQueue.put64(next.getClZbh(), next.getXlBm(), "套跑@系统");
  284 + //directiveService.lineChange(next.getClZbh(), next.getXlBm(), "套跑@系统");
  285 + }
284 286 }
285 287 else if(sch.getBcType().equals("in")){
286 288 //终班进场,切换成非营运状态
287   - directiveService.send60Operation(sch.getClZbh(), 1, Integer.parseInt(sch.getXlDir()), null, "进场@系统");
  289 + DirectivePushQueue.put6003(sch.getClZbh(), 1, Integer.parseInt(sch.getXlDir()), null, "进场@系统");
  290 + //directiveService.send60Operation(sch.getClZbh(), 1, Integer.parseInt(sch.getXlDir()), null, "进场@系统");
288 291 }
289 292 }
290 293 else {
... ...
src/main/java/com/bsth/data/gpsdata/recovery/GpsDataRecovery.java
... ... @@ -164,7 +164,7 @@ public class GpsDataRecovery implements ApplicationContextAware {
164 164 //abnormalStateHandle.handle(gps, prevs);
165 165  
166 166 if(!task)
167   - return; //无任务的,到这里就结束
  167 + continue; //无任务的,到这里就结束
168 168  
169 169 //反向处理
170 170 reverseSignalHandle.handle(gps, prevs);
... ...
src/main/java/com/bsth/data/gpsdata/thread/ThreadPollMonitor.java deleted 100644 → 0
1   -package com.bsth.data.gpsdata.thread;
2   -
3   -import com.bsth.data.gpsdata.arrival.GpsRealAnalyse;
4   -import org.springframework.stereotype.Component;
5   -
6   -/**
7   - * 线程池监听
8   - * Created by panzhao on 2017/5/10.
9   - */
10   -@Component
11   -public class ThreadPollMonitor extends Thread{
12   -
13   - @Override
14   - public void run() {
15   - long t = System.currentTimeMillis();
16   -
17   - if(t - GpsRealAnalyse.st > 3000 * 10){
18   - GpsRealAnalyse.shutdown();
19   - }
20   - }
21   -}
src/main/java/com/bsth/data/msg_queue/DirectivePushQueue.java 0 → 100644
  1 +package com.bsth.data.msg_queue;
  2 +
  3 +import com.bsth.entity.realcontrol.ScheduleRealInfo;
  4 +import com.bsth.service.directive.DirectiveService;
  5 +import org.slf4j.Logger;
  6 +import org.slf4j.LoggerFactory;
  7 +import org.springframework.beans.BeansException;
  8 +import org.springframework.boot.CommandLineRunner;
  9 +import org.springframework.context.ApplicationContext;
  10 +import org.springframework.context.ApplicationContextAware;
  11 +import org.springframework.stereotype.Component;
  12 +
  13 +import java.util.LinkedList;
  14 +
  15 +/**
  16 + * 到网关的指令推送队列 (系统发送的队列, 用户手动发送的不走这里)
  17 + * Created by panzhao on 2017/5/11.
  18 + */
  19 +@Component
  20 +public class DirectivePushQueue implements CommandLineRunner, ApplicationContextAware {
  21 +
  22 + static LinkedList<QueueData_Directive> linkedList;
  23 + static DataPushThread thread;
  24 + static DirectiveService directiveService;
  25 + static long t;
  26 + static final int IDLE_TIME = 1000 * 30;
  27 +
  28 + static {
  29 + linkedList = new LinkedList<>();
  30 + }
  31 +
  32 + public static void put6002(ScheduleRealInfo sch, int finish, String sender){
  33 + QueueData_Directive qd6002 = new QueueData_Directive();
  34 + qd6002.setSch(sch);
  35 + qd6002.setFinish(finish);
  36 + qd6002.setSender(sender);
  37 + qd6002.setCode("60_02");
  38 +
  39 + linkedList.add(qd6002);
  40 + }
  41 +
  42 + public static void put6003(String nbbm, int state, int upDown, ScheduleRealInfo sch, String sender){
  43 + QueueData_Directive qd6003 = new QueueData_Directive();
  44 + qd6003.setNbbm(nbbm);
  45 + qd6003.setState(state);
  46 + qd6003.setUpDown(upDown);
  47 + qd6003.setSch(sch);
  48 + qd6003.setSender(sender);
  49 +
  50 + qd6003.setCode("60_03");
  51 +
  52 + linkedList.add(qd6003);
  53 + }
  54 +
  55 + public static void put64(String nbbm, String lineCode, String sender){
  56 + QueueData_Directive qd64 = new QueueData_Directive();
  57 + qd64.setNbbm(nbbm);
  58 + qd64.setLineCode(lineCode);
  59 + qd64.setSender(sender);
  60 +
  61 + qd64.setCode("64");
  62 +
  63 + linkedList.add(qd64);
  64 + }
  65 +
  66 + public static boolean isIdle(){
  67 + return System.currentTimeMillis() - t > IDLE_TIME;
  68 + }
  69 +
  70 + public static void start(){
  71 + if(thread != null){
  72 + thread.interrupt();
  73 + }
  74 + thread = new DataPushThread();
  75 + thread.start();
  76 + }
  77 +
  78 + @Override
  79 + public void run(String... strings) throws Exception {
  80 + start();
  81 + }
  82 +
  83 + @Override
  84 + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  85 + directiveService = applicationContext.getBean(DirectiveService.class);
  86 + }
  87 +
  88 + public static class DataPushThread extends Thread {
  89 +
  90 + Logger log = LoggerFactory.getLogger(this.getClass());
  91 +
  92 + @Override
  93 + public void run() {
  94 + boolean sleepFlag = false;
  95 + QueueData_Directive qd;
  96 + String code;
  97 + while (true) {
  98 + try {
  99 + qd = linkedList.pollFirst();
  100 + if (qd != null) {
  101 + sleepFlag = false;
  102 + code = qd.getCode();
  103 +
  104 + if(code.equals("60_02")){
  105 + directiveService.send60Dispatch(qd.getSch(), qd.getFinish(), qd.getSender());
  106 + log.info("directive 60_02 sch id: " + qd.getSch().getId());
  107 + }
  108 + else if(code.equals("60_03")){
  109 + directiveService.send60Operation(qd.getNbbm(), qd.getState(), qd.getUpDown(), null, qd.getSender());
  110 + log.info("directive 60_03 nbbm: " + qd.getNbbm());
  111 + }
  112 + else if(code.equals("64")){
  113 + directiveService.lineChange(qd.getNbbm(), qd.getLineCode(), qd.getSender());
  114 + log.info("directive 64 nbbm: " + qd.getNbbm() + " lineCode: " + qd.getLineCode());
  115 + }
  116 +
  117 + } else{
  118 + Thread.sleep(500);
  119 + if(!sleepFlag){
  120 + log.info("sleep...");
  121 + sleepFlag = true;
  122 + }
  123 + }
  124 + t = System.currentTimeMillis();
  125 + }
  126 + catch(InterruptedException e){
  127 + break;
  128 + }
  129 + catch (Exception e) {
  130 + log.error("", e);
  131 + }
  132 +
  133 + }
  134 + }
  135 + }
  136 +}
... ...
src/main/java/com/bsth/data/msg_queue/QueueData.java 0 → 100644
  1 +package com.bsth.data.msg_queue;
  2 +
  3 +import org.springframework.web.socket.TextMessage;
  4 +import org.springframework.web.socket.WebSocketSession;
  5 +
  6 +/**
  7 + * Created by panzhao on 2017/5/11.
  8 + */
  9 +public class QueueData {
  10 +
  11 + private TextMessage message;
  12 +
  13 + private WebSocketSession session;
  14 +
  15 +
  16 + public WebSocketSession getSession() {
  17 + return session;
  18 + }
  19 +
  20 + public void setSession(WebSocketSession session) {
  21 + this.session = session;
  22 + }
  23 +
  24 + public TextMessage getMessage() {
  25 + return message;
  26 + }
  27 +
  28 + public void setMessage(TextMessage message) {
  29 + this.message = message;
  30 + }
  31 +}
... ...
src/main/java/com/bsth/data/msg_queue/QueueData_Directive.java 0 → 100644
  1 +package com.bsth.data.msg_queue;
  2 +
  3 +import com.bsth.entity.realcontrol.ScheduleRealInfo;
  4 +
  5 +/**
  6 + * Created by panzhao on 2017/5/11.
  7 + */
  8 +public class QueueData_Directive {
  9 +
  10 + /**
  11 + * 指令类型
  12 + * 60_02
  13 + * 60_03
  14 + * 64
  15 + */
  16 + private String code;
  17 +
  18 + /** 60调度指令内容 60_02*/
  19 + private ScheduleRealInfo sch;
  20 + private int finish;
  21 +
  22 + /** 60 营运指令 60_03*/
  23 + private String nbbm;
  24 + private int state;
  25 + private int upDown;
  26 +
  27 + /** 64指令内容 */
  28 + private String lineCode;
  29 +
  30 + private String sender;
  31 +
  32 +
  33 + public ScheduleRealInfo getSch() {
  34 + return sch;
  35 + }
  36 +
  37 + public void setSch(ScheduleRealInfo sch) {
  38 + this.sch = sch;
  39 + }
  40 +
  41 + public int getFinish() {
  42 + return finish;
  43 + }
  44 +
  45 + public void setFinish(int finish) {
  46 + this.finish = finish;
  47 + }
  48 +
  49 + public String getSender() {
  50 + return sender;
  51 + }
  52 +
  53 + public void setSender(String sender) {
  54 + this.sender = sender;
  55 + }
  56 +
  57 + public String getNbbm() {
  58 + return nbbm;
  59 + }
  60 +
  61 + public void setNbbm(String nbbm) {
  62 + this.nbbm = nbbm;
  63 + }
  64 +
  65 + public int getState() {
  66 + return state;
  67 + }
  68 +
  69 + public void setState(int state) {
  70 + this.state = state;
  71 + }
  72 +
  73 + public int getUpDown() {
  74 + return upDown;
  75 + }
  76 +
  77 + public void setUpDown(int upDown) {
  78 + this.upDown = upDown;
  79 + }
  80 +
  81 + public String getCode() {
  82 + return code;
  83 + }
  84 +
  85 + public void setCode(String code) {
  86 + this.code = code;
  87 + }
  88 +
  89 + public String getLineCode() {
  90 + return lineCode;
  91 + }
  92 +
  93 + public void setLineCode(String lineCode) {
  94 + this.lineCode = lineCode;
  95 + }
  96 +}
... ...
src/main/java/com/bsth/data/msg_queue/WebSocketPushQueue.java 0 → 100644
  1 +package com.bsth.data.msg_queue;
  2 +
  3 +import com.bsth.common.Constants;
  4 +import org.slf4j.Logger;
  5 +import org.slf4j.LoggerFactory;
  6 +import org.springframework.boot.CommandLineRunner;
  7 +import org.springframework.stereotype.Component;
  8 +import org.springframework.web.socket.TextMessage;
  9 +import org.springframework.web.socket.WebSocketSession;
  10 +
  11 +import java.util.LinkedList;
  12 +
  13 +/**
  14 + * 线调web socket 推送队列
  15 + * Created by panzhao on 2017/5/11.
  16 + */
  17 +@Component
  18 +public class WebSocketPushQueue implements CommandLineRunner {
  19 +
  20 + static LinkedList<QueueData> linkedList;
  21 + static DataPushThread thread;
  22 + static Logger log = LoggerFactory.getLogger(WebSocketPushQueue.class);
  23 + static long t;
  24 + static final int IDLE_TIME = 1000 * 30;
  25 +
  26 + static {
  27 + linkedList = new LinkedList();
  28 + }
  29 +
  30 + public static boolean isIdle() {
  31 + return System.currentTimeMillis() - t > IDLE_TIME;
  32 + }
  33 +
  34 + public static void put(WebSocketSession session, TextMessage msg) {
  35 + QueueData qd = new QueueData();
  36 + qd.setMessage(msg);
  37 + qd.setSession(session);
  38 +
  39 + log.info("put、[" + session.getAttributes().get(Constants.SESSION_USERNAME) + "] 、" + msg.getPayload());
  40 + linkedList.add(qd);
  41 + }
  42 +
  43 + public static void start() {
  44 + if (thread != null) {
  45 + thread.interrupt();
  46 + }
  47 + thread = new DataPushThread();
  48 + thread.start();
  49 + }
  50 +
  51 + @Override
  52 + public void run(String... strings) throws Exception {
  53 + start();
  54 + }
  55 +
  56 + public static class DataPushThread extends Thread {
  57 +
  58 + Logger log = LoggerFactory.getLogger(this.getClass());
  59 +
  60 + @Override
  61 + public void run() {
  62 + QueueData qd;
  63 + WebSocketSession session;
  64 +
  65 + boolean sleepFlag = false;
  66 + while (true) {
  67 + try {
  68 + qd = linkedList.pollFirst();
  69 + if (qd != null) {
  70 + sleepFlag = false;
  71 + session = qd.getSession();
  72 + if (session.isOpen()) {
  73 + log.info("push start、[" + session.getAttributes().get(Constants.SESSION_USERNAME) + "] 、" + qd.getMessage().getPayload());
  74 + session.sendMessage(qd.getMessage());
  75 + log.info("push end..");
  76 + }
  77 + } else {
  78 + Thread.sleep(500);
  79 + if (!sleepFlag) {
  80 + log.info("sleep...");
  81 + sleepFlag = true;
  82 + }
  83 + }
  84 + t = System.currentTimeMillis();
  85 + } catch (InterruptedException e) {
  86 + break;
  87 + } catch (Exception e) {
  88 + log.error("", e);
  89 + }
  90 + }
  91 + }
  92 + }
  93 +}
... ...
src/main/java/com/bsth/data/pilot80/PilotReport.java
... ... @@ -4,6 +4,7 @@ import com.bsth.data.BasicData;
4 4 import com.bsth.data.LineConfigData;
5 5 import com.bsth.data.gpsdata.GpsEntity;
6 6 import com.bsth.data.gpsdata.GpsRealData;
  7 +import com.bsth.data.msg_queue.DirectivePushQueue;
7 8 import com.bsth.data.schedule.DayOfSchedule;
8 9 import com.bsth.entity.Line;
9 10 import com.bsth.entity.directive.D80;
... ... @@ -82,9 +83,11 @@ public class PilotReport {
82 83 outSch = dayOfSchedule.next(outSch);
83 84  
84 85 //下发调度指令
85   - directiveService.send60Dispatch(outSch, dayOfSchedule.doneSum(nbbm), "请出@系统");
  86 + DirectivePushQueue.put6002(outSch, dayOfSchedule.doneSum(nbbm), "请出@系统");
  87 + //directiveService.send60Dispatch(outSch, dayOfSchedule.doneSum(nbbm), "请出@系统");
86 88 //下发线路切换指令
87   - directiveService.lineChange(outSch.getClZbh(), outSch.getXlBm(), "请出@系统");
  89 + DirectivePushQueue.put64(outSch.getClZbh(), outSch.getXlBm(), "请出@系统");
  90 + //directiveService.lineChange(outSch.getClZbh(), outSch.getXlBm(), "请出@系统");
88 91 }else
89 92 d80.setRemarks("没有出场计划");
90 93  
... ...
src/main/java/com/bsth/data/schedule/DayOfSchedule.java
... ... @@ -7,6 +7,7 @@ import com.bsth.common.Constants;
7 7 import com.bsth.common.ResponseCode;
8 8 import com.bsth.data.BasicData;
9 9 import com.bsth.data.LineConfigData;
  10 +import com.bsth.data.ThreadMonotor;
10 11 import com.bsth.data.directive.DirectivesPstThread;
11 12 import com.bsth.data.gpsdata.GpsRealData;
12 13 import com.bsth.data.gpsdata.recovery.GpsDataRecovery;
... ... @@ -139,6 +140,9 @@ public class DayOfSchedule implements CommandLineRunner {
139 140 @Autowired
140 141 CalcOilThread calcOilThread;
141 142  
  143 + @Autowired
  144 + ThreadMonotor threadMonotor;
  145 +
142 146 private static DateTimeFormatter fmtyyyyMMdd = DateTimeFormat.forPattern("yyyy-MM-dd"), fmtHHmm = DateTimeFormat.forPattern("HH:mm");
143 147  
144 148 @Override
... ... @@ -165,6 +169,9 @@ public class DayOfSchedule implements CommandLineRunner {
165 169  
166 170 //指令持久化线程
167 171 Application.mainServices.scheduleWithFixedDelay(directivesPstThread, 180, 180, TimeUnit.SECONDS);
  172 +
  173 + //监听
  174 + Application.mainServices.scheduleWithFixedDelay(threadMonotor, 120, 60, TimeUnit.SECONDS);
168 175 }
169 176  
170 177 //数据恢复
... ...
src/main/java/com/bsth/service/directive/DirectiveServiceImpl.java
... ... @@ -85,12 +85,15 @@ public class DirectiveServiceImpl extends BaseServiceImpl&lt;D60, Integer&gt; implemen
85 85 if(null != sender)
86 86 d60.setSender(sender);
87 87 d60.setHttpCode(code);
88   - // 添加到缓存
89   - dayOfDirectives.put60(d60);
90 88  
91   - if (code != 0) {
  89 + if (code == 0) {
  90 + // 添加到缓存
  91 + dayOfDirectives.put60(d60, true);
  92 + }
  93 + else{
92 94 d60.setErrorText("网关通讯失败, code: " + code);
93 95 d60Repository.save(d60);
  96 + dayOfDirectives.put60(d60, false);
94 97 }
95 98 return code;
96 99 }
... ... @@ -139,12 +142,13 @@ public class DirectiveServiceImpl extends BaseServiceImpl&lt;D60, Integer&gt; implemen
139 142 if (code == 0) {
140 143 sch.setDirectiveState(60);
141 144 // 添加到缓存,延迟入库
142   - dayOfDirectives.put60(d60);
  145 + dayOfDirectives.put60(d60, true);
143 146 // 通知页面
144 147 sendD60ToPage(sch);
145 148 }
146 149 else{
147 150 d60.setErrorText("网关通讯失败, code: " + code);
  151 + dayOfDirectives.put60(d60, false);
148 152 d60Repository.save(d60);
149 153 }
150 154 return code;
... ... @@ -196,11 +200,15 @@ public class DirectiveServiceImpl extends BaseServiceImpl&lt;D60, Integer&gt; implemen
196 200 d60.setHttpCode(code);
197 201 if (null != sch)
198 202 d60.setSch(sch);
199   - dayOfDirectives.put60(d60);
200 203  
201   - if (code != 0) {
  204 +
  205 + if (code == 0) {
  206 + dayOfDirectives.put60(d60, true);
  207 + }
  208 + else{
202 209 d60.setErrorText("网关通讯失败, code: " + code);
203 210 d60Repository.save(d60);
  211 + dayOfDirectives.put60(d60, false);
204 212 }
205 213 return code;
206 214 }
... ...
src/main/java/com/bsth/service/realcontrol/impl/ScheduleRealInfoServiceImpl.java
... ... @@ -10,6 +10,7 @@ import com.bsth.controller.realcontrol.dto.DfsjChange;
10 10 import com.bsth.controller.realcontrol.dto.LpData;
11 11 import com.bsth.data.BasicData;
12 12 import com.bsth.data.LineConfigData;
  13 +import com.bsth.data.msg_queue.DirectivePushQueue;
13 14 import com.bsth.data.schedule.DayOfSchedule;
14 15 import com.bsth.data.schedule.SchAttrCalculator;
15 16 import com.bsth.data.schedule.SchModifyLog;
... ... @@ -1280,11 +1281,12 @@ public class ScheduleRealInfoServiceImpl extends BaseServiceImpl&lt;ScheduleRealInf
1280 1281 ts.add(next);
1281 1282 }
1282 1283  
1283   - //车辆下一个要执行的班次
1284 1284 try{
  1285 + //车辆下一个要执行的班次
1285 1286 ScheduleRealInfo carNext = dayOfSchedule.next(sch);
1286 1287 if(carNext != null && !carNext.getXlBm().equals(sch.getXlBm())){
1287   - directiveService.lineChange(carNext.getClZbh(), carNext.getXlBm(), "套跑@系统");
  1288 + DirectivePushQueue.put64(carNext.getClZbh(), carNext.getXlBm(), "套跑@系统");
  1289 + //directiveService.lineChange(carNext.getClZbh(), carNext.getXlBm(), "套跑@系统");
1288 1290 }
1289 1291 }catch (Exception e){logger.error("", e);}
1290 1292  
... ...
src/main/java/com/bsth/websocket/handler/RealControlSocketHandler.java
1 1 package com.bsth.websocket.handler;
2 2  
3 3 import com.alibaba.fastjson.JSONObject;
4   -import com.bsth.common.Constants;
5 4 import com.bsth.data.BasicData;
  5 +import com.bsth.data.msg_queue.WebSocketPushQueue;
6 6 import com.google.common.base.Splitter;
7 7 import org.slf4j.Logger;
8 8 import org.slf4j.LoggerFactory;
... ... @@ -103,7 +103,8 @@ public class RealControlSocketHandler implements WebSocketHandler {
103 103 return;
104 104  
105 105 for(WebSocketSession user : list){
106   - try {
  106 + WebSocketPushQueue.put(user, message);
  107 + /*try {
107 108 if (user.isOpen()) {
108 109 user.sendMessage(message);
109 110 }
... ... @@ -114,7 +115,7 @@ public class RealControlSocketHandler implements WebSocketHandler {
114 115 catch(Exception e2){}
115 116 logger.error("sendMessageToLine error ...."+msg);
116 117 logger.error("sendMessageToLine error ....", e);
117   - }
  118 + }*/
118 119 }
119 120 }
120 121  
... ...
src/main/resources/logback.xml
... ... @@ -182,6 +182,48 @@
182 182 <appender-ref ref="GPS_COUNT" />
183 183 </logger>
184 184  
  185 + <!-- 消息队列纪录 -->
  186 + <appender name="QUEUE_WEB_SOCKET"
  187 + class="ch.qos.logback.core.rolling.RollingFileAppender">
  188 + <file>${LOG_BASE}/msg_queue/websocket.log</file>
  189 + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
  190 + <fileNamePattern>${LOG_BASE}/msg_queue/websocket-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
  191 + <timeBasedFileNamingAndTriggeringPolicy
  192 + class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
  193 + <maxFileSize>100MB</maxFileSize>
  194 + </timeBasedFileNamingAndTriggeringPolicy>
  195 + </rollingPolicy>
  196 +
  197 + <layout class="ch.qos.logback.classic.PatternLayout">
  198 + <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%file:%line] %-5level -%msg%n
  199 + </pattern>
  200 + </layout>
  201 + </appender>
  202 + <logger name="com.bsth.data.msg_queue.WebSocketPushQueue"
  203 + level="INFO" additivity="false">
  204 + <appender-ref ref="QUEUE_WEB_SOCKET" />
  205 + </logger>
  206 + <appender name="QUEUE_DIRECTIVE"
  207 + class="ch.qos.logback.core.rolling.RollingFileAppender">
  208 + <file>${LOG_BASE}/msg_queue/directive.log</file>
  209 + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
  210 + <fileNamePattern>${LOG_BASE}/msg_queue/directive-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
  211 + <timeBasedFileNamingAndTriggeringPolicy
  212 + class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
  213 + <maxFileSize>100MB</maxFileSize>
  214 + </timeBasedFileNamingAndTriggeringPolicy>
  215 + </rollingPolicy>
  216 +
  217 + <layout class="ch.qos.logback.classic.PatternLayout">
  218 + <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%file:%line] %-5level -%msg%n
  219 + </pattern>
  220 + </layout>
  221 + </appender>
  222 + <logger name="com.bsth.data.msg_queue.DirectivePushQueue"
  223 + level="INFO" additivity="false">
  224 + <appender-ref ref="QUEUE_DIRECTIVE" />
  225 + </logger>
  226 +
185 227 <!--<logger name="org.hibernate.SQL" level="TRACE">-->
186 228 <!--<appender-ref ref="STDOUT" />-->
187 229 <!--</logger>-->
... ...
src/main/resources/static/real_control_v2/fragments/line_schedule/context_menu/fcxxwt.html
... ... @@ -116,10 +116,10 @@
116 116 <label class="uk-form-label" >调整说明</label>
117 117 <div class="uk-form-controls">
118 118 <select name="adjustExps">
119   - <option value="">请选择..</option>
120   - {{each adjustExps as exp i}}
  119 + <option value="">请选择..</option>
  120 + {{each adjustExps as exp i}}
121 121 <option value="{{exp}}">{{exp}}</option>
122   - {{/each}}
  122 + {{/each}}
123 123 </select>
124 124 </div>
125 125 </div>
... ... @@ -129,7 +129,7 @@
129 129 <div class="uk-width-1-1">
130 130 <div class="uk-form-row ct-stacked">
131 131 <div class="uk-form-controls" style="margin-top: 5px;">
132   - <textarea id="form-s-t" cols="30" rows="5" name="remarks" data-fv-stringlength="true" data-fv-stringlength-max="50" placeholder="备注,不超过50个字符">{{sch.remarks}}</textarea>
  132 + <textarea id="form-s-t" cols="30" rows="5" required name="remarks" data-fv-stringlength="true" data-fv-stringlength-max="50" placeholder="备注,不超过50个字符">{{sch.remarks}}</textarea>
133 133 </div>
134 134 </div>
135 135 </div>
... ...
src/main/resources/static/real_control_v2/fragments/line_schedule/context_menu/sftz.html
... ... @@ -54,7 +54,7 @@
54 54 <div class="uk-form-row ct-stacked">
55 55 <label class="uk-form-label" for="form-s-t">调整说明<small class="font-danger">(不超过20个字符)</small></label>
56 56 <div class="uk-form-controls">
57   - <textarea id="form-s-t" cols="30" rows="5" name="remarks" data-fv-stringlength="true" data-fv-stringlength-max="20" placeholder="不超过20个字符。非必填"></textarea>
  57 + <textarea id="form-s-t" cols="30" rows="5" name="remarks" required data-fv-stringlength="true" data-fv-stringlength-max="20" placeholder="不超过20个字符。必填"></textarea>
58 58 </div>
59 59 </div>
60 60 </div>
... ...
src/main/resources/static/real_control_v2/js/main.js
... ... @@ -169,8 +169,8 @@ var disabled_submit_btn = function (form) {
169 169 function showUpdateDescription() {
170 170 //更新说明
171 171 var updateDescription = {
172   - date: '2017-05-09',
173   - text: '<h5>恢复到了最新功能版本,并修复了临加时没有保存售票员的问题。</h5>'
  172 + date: '2017-05-12',
  173 + text: '<h5>现在班次调整时,备注是必填项</h5><h5>现在中途更换设备后,需要刷新缓存数据后,再刷新线调页面。</h5><h5>分班套跑车辆将在上午最后一个班次完成后发送线路切换指令。捕捉到出场信号时也会切换至出场班次所在线路,驾驶员请求出场时候也会同步一次状态</h5><h5>修复了一些其他问题。</h5>'
174 174 };
175 175  
176 176 var storage = window.localStorage
... ...
src/main/resources/static/real_control_v2/mapmonitor/fragments/playback_v2/main.html
... ... @@ -864,6 +864,8 @@
864 864 if(typeof(upDown) == "undefined")
865 865 upDown = roadUpdown;
866 866  
  867 + if(upDown!= 0 && upDown != 1)
  868 + return;
867 869 map.clearOverlays();
868 870 parkPolygons = {};//清除停车场对照
869 871 //路段
... ...