Commit 51efce95ff5bea0db5a0fd591a30916e6c0b4ffc

Authored by 潘钊
1 parent d9cd62fc

update...

src/main/java/com/bsth/controller/realcontrol/anomalyCheckController.java
1 1 package com.bsth.controller.realcontrol;
2 2  
  3 +import com.bsth.data.gpsdata.arrival.GpsRealAnalyse;
3 4 import com.bsth.data.schedule.DayOfSchedule;
4 5 import com.bsth.entity.realcontrol.ScheduleRealInfo;
5 6 import org.slf4j.Logger;
... ... @@ -49,4 +50,9 @@ public class anomalyCheckController {
49 50 dayOfSchedule.replaceByNbbm(nbbm, map.values());
50 51 }
51 52 }
  53 +
  54 + @RequestMapping(value = "/shutdownThreadPool")
  55 + public void shutdownThreadPool(){
  56 + GpsRealAnalyse.shutdown();
  57 + }
52 58 }
... ...
src/main/java/com/bsth/data/directive/GatewayHttpUtils.java
1 1 package com.bsth.data.directive;
2 2  
3   -import java.io.IOException;
4   -
  3 +import com.alibaba.fastjson.JSONObject;
  4 +import com.bsth.util.ConfigUtil;
5 5 import org.apache.http.client.config.RequestConfig;
6 6 import org.apache.http.client.methods.CloseableHttpResponse;
7 7 import org.apache.http.client.methods.HttpPost;
... ... @@ -12,8 +12,7 @@ import org.apache.http.util.EntityUtils;
12 12 import org.slf4j.Logger;
13 13 import org.slf4j.LoggerFactory;
14 14  
15   -import com.alibaba.fastjson.JSONObject;
16   -import com.bsth.util.ConfigUtil;
  15 +import java.io.IOException;
17 16  
18 17 /**
19 18 *
... ... @@ -42,7 +41,7 @@ public class GatewayHttpUtils {
42 41  
43 42 //超时时间
44 43 RequestConfig requestConfig = RequestConfig.custom()
45   - .setConnectTimeout(3000).setConnectionRequestTimeout(1000)
  44 + .setConnectTimeout(3000).setConnectionRequestTimeout(1000)
46 45 .setSocketTimeout(3000).build();
47 46  
48 47 HttpPost post = new HttpPost(url);
... ...
src/main/java/com/bsth/data/gpsdata/GpsRealData.java
... ... @@ -4,6 +4,7 @@ import com.bsth.data.BasicData;
4 4 import com.bsth.data.forecast.ForecastRealServer;
5 5 import com.bsth.data.gpsdata.thread.GpsDataLoaderThread;
6 6 import com.bsth.data.gpsdata.thread.OfflineMonitorThread;
  7 +import com.bsth.data.gpsdata.thread.ThreadPollMonitor;
7 8 import com.bsth.data.schedule.DayOfSchedule;
8 9 import com.bsth.entity.realcontrol.ScheduleRealInfo;
9 10 import com.google.common.collect.TreeMultimap;
... ... @@ -46,6 +47,9 @@ public class GpsRealData implements CommandLineRunner {
46 47 @Autowired
47 48 ForecastRealServer forecastRealServer;
48 49  
  50 + @Autowired
  51 + ThreadPollMonitor threadPollMonitor;
  52 +
49 53 /**
50 54 * 构造函数
51 55 */
... ... @@ -61,6 +65,10 @@ public class GpsRealData implements CommandLineRunner {
61 65 //Application.mainServices.scheduleWithFixedDelay(gpsDataLoader, 20, 2, TimeUnit.SECONDS);
62 66 //定时扫描掉离线
63 67 //Application.mainServices.scheduleWithFixedDelay(offlineMonitorThread, 120, 60, TimeUnit.SECONDS);
  68 +
  69 + //扫描GPS线程池状态
  70 + //Application.mainServices.scheduleWithFixedDelay(threadPollMonitor, 60, 20, TimeUnit.SECONDS);
  71 +
64 72 }
65 73  
66 74  
... ...
src/main/java/com/bsth/data/gpsdata/arrival/GpsRealAnalyse.java
... ... @@ -26,7 +26,7 @@ import java.util.concurrent.Executors;
26 26 @Component
27 27 public class GpsRealAnalyse {
28 28  
29   - Logger logger = LoggerFactory.getLogger(this.getClass());
  29 + static Logger logger = LoggerFactory.getLogger(GpsRealAnalyse.class);
30 30  
31 31 @Autowired
32 32 OfflineSignalHandle offlineSignalHandle;
... ... @@ -46,40 +46,54 @@ public class GpsRealAnalyse {
46 46  
47 47 static ExecutorService threadPool = Executors.newFixedThreadPool(100);
48 48  
  49 + public static long st;
  50 + public static CountDownLatch count;
  51 +
49 52 public void analyse(List<GpsEntity> list) {
50   - //如果正在恢复数据
51   - if (GpsDataRecovery.run)
52   - return;
53   -
54   - long t = System.currentTimeMillis();
55   - logger.info("analyse gps size: " + list.size());
56   - //按线路分组gps
57   - ArrayListMultimap multimap = ArrayListMultimap.create();
58   - for(GpsEntity gps : list){
59   - multimap.put(gps.getLineId(), gps);
60   - }
  53 + try {
  54 + st = System.currentTimeMillis();
  55 + //如果正在恢复数据
  56 + if (GpsDataRecovery.run)
  57 + return;
  58 +
  59 + //按线路分组gps
  60 + ArrayListMultimap multimap = ArrayListMultimap.create();
  61 + for (GpsEntity gps : list) {
  62 + multimap.put(gps.getLineId(), gps);
  63 + }
61 64  
62   - Set<String> ks = multimap.keySet();
63   - CountDownLatch count = new CountDownLatch(ks.size());
  65 + Set<String> ks = multimap.keySet();
  66 + logger.info("analyse gps size: " + list.size() + ", ks: " + ks.size());
  67 + count = new CountDownLatch(ks.size());
64 68  
65   - for(String lineCode : ks){
66   - threadPool.execute(new SignalHandleThread(multimap.get(lineCode), count));
67   - }
  69 + for (String lineCode : ks) {
  70 + threadPool.execute(new SignalHandleThread(multimap.get(lineCode), count));
  71 + }
68 72  
69   - try {
70 73 //等待子线程结束
71 74 count.await();
72 75  
73 76 //加入实时gps对照
74   - for(GpsEntity gps: list)
  77 + for (GpsEntity gps : list)
75 78 gpsRealData.put(gps);
76 79  
77   - logger.info("time , " + (System.currentTimeMillis() - t));
78   - } catch (InterruptedException e) {
  80 + logger.info("time , " + (System.currentTimeMillis() - st));
  81 + } catch (Exception e) {
79 82 logger.error("", e);
80 83 }
81 84 }
82 85  
  86 + public static void shutdown() {
  87 + logger.warn("GpsRealAnalyse shutdown!!");
  88 + threadPool.shutdownNow();
  89 + long len = count.getCount();
  90 + for (int i = 0; i < len; i++) {
  91 + count.countDown();
  92 + }
  93 +
  94 + threadPool = Executors.newFixedThreadPool(100);
  95 + }
  96 +
83 97 static GpsComp comp = new GpsComp();
84 98  
85 99 public class SignalHandleThread implements Runnable {
... ... @@ -97,7 +111,7 @@ public class GpsRealAnalyse {
97 111  
98 112 try {
99 113 Collections.sort(list, comp);
100   - for(GpsEntity gps : list){
  114 + for (GpsEntity gps : list) {
101 115 //是否有任务
102 116 boolean task;
103 117 CircleQueue<GpsEntity> prevs = GeoCacheData.getGps(gps.getNbbm());
... ... @@ -110,7 +124,7 @@ public class GpsRealAnalyse {
110 124 //异常判定(越界/超速)
111 125 abnormalStateHandle.handle(gps, prevs);
112 126  
113   - if(!task)
  127 + if (!task)
114 128 return; //无任务的,到这里就结束
115 129  
116 130 //反向处理
... ... @@ -122,7 +136,7 @@ public class GpsRealAnalyse {
122 136 } catch (Exception e) {
123 137 logger.error("", e);
124 138 } finally {
125   - if(count != null)
  139 + if (count != null)
126 140 count.countDown();
127 141 }
128 142 }
... ...
src/main/java/com/bsth/data/gpsdata/thread/GpsDataLoaderThread.java
... ... @@ -101,9 +101,9 @@ public class GpsDataLoaderThread extends Thread {
101 101  
102 102 old = gpsRealData.get(gps.getDeviceId());
103 103 if (old != null &&
104   - old.getTimestamp() == gps.getTimestamp() &&
105   - old.getLat() == gps.getLat() &&
106   - old.getLon() == gps.getLon())
  104 + old.getTimestamp().equals(gps.getTimestamp()) &&
  105 + old.getLat().equals(gps.getLat()) &&
  106 + old.getLon().equals(gps.getLon()))
107 107 continue;
108 108  
109 109 nbbm = BasicData.deviceId2NbbmMap.get(gps.getDeviceId());
... ...
src/main/java/com/bsth/data/gpsdata/thread/ThreadPollMonitor.java 0 → 100644
  1 +package com.bsth.data.gpsdata.thread;
  2 +
  3 +import com.bsth.data.gpsdata.arrival.GpsRealAnalyse;
  4 +import org.springframework.stereotype.Component;
  5 +
  6 +/**
  7 + * 线程池监听
  8 + * Created by panzhao on 2017/5/10.
  9 + */
  10 +@Component
  11 +public class ThreadPollMonitor extends Thread{
  12 +
  13 + @Override
  14 + public void run() {
  15 + long t = System.currentTimeMillis();
  16 +
  17 + if(t - GpsRealAnalyse.st > 3000 * 10){
  18 + GpsRealAnalyse.shutdown();
  19 + }
  20 + }
  21 +}
... ...
src/main/java/com/bsth/data/schedule/DayOfSchedule.java
... ... @@ -833,6 +833,29 @@ public class DayOfSchedule implements CommandLineRunner {
833 833 return sch;
834 834 }
835 835  
  836 + /**
  837 + * 搜索离当前时间最近的一个指定类型的班次
  838 + * @param nbbm
  839 + * @param bcType
  840 + * @return
  841 + */
  842 + public ScheduleRealInfo searchNearByBcType(String nbbm, String bcType){
  843 + List<ScheduleRealInfo> list = findByBcType(nbbm, bcType);
  844 + Collections.sort(list, schFCSJComparator);
  845 +
  846 + long t = System.currentTimeMillis();
  847 + int distance=-1;
  848 +
  849 + ScheduleRealInfo sch = null;
  850 + for(ScheduleRealInfo temp : list){
  851 +
  852 + if(Math.abs(temp.getDfsjT() - t) < distance || distance == -1){
  853 + sch = temp;
  854 + }
  855 + }
  856 + return sch;
  857 + }
  858 +
836 859 public List<ScheduleRealInfo> findByBcType(String nbbm, String bcType) {
837 860 List<ScheduleRealInfo> all = nbbmScheduleMap.get(nbbm), outList = new ArrayList<>();
838 861  
... ...
src/main/java/com/bsth/data/schedule/ScheduleComparator.java
1 1 package com.bsth.data.schedule;
2 2  
3   -import java.util.Comparator;
4   -
5 3 import com.bsth.entity.realcontrol.ScheduleRealInfo;
6 4  
  5 +import java.util.Comparator;
  6 +
7 7 /**
8 8 *
9 9 * @ClassName: ScheduleComparator
... ... @@ -25,7 +25,7 @@ public class ScheduleComparator {
25 25  
26 26 @Override
27 27 public int compare(ScheduleRealInfo s1, ScheduleRealInfo s2) {
28   - return (int) (s1.getDfsjT() - s2.getDfsjT());
  28 + return (int) (s1.getFcsjT() - s2.getFcsjT());
29 29 }
30 30 }
31 31 }
... ...
src/main/java/com/bsth/websocket/handler/RealControlSocketHandler.java
... ... @@ -4,7 +4,6 @@ import com.alibaba.fastjson.JSONObject;
4 4 import com.bsth.common.Constants;
5 5 import com.bsth.data.BasicData;
6 6 import com.google.common.base.Splitter;
7   -import com.google.common.collect.ArrayListMultimap;
8 7 import org.slf4j.Logger;
9 8 import org.slf4j.LoggerFactory;
10 9 import org.springframework.context.annotation.Scope;
... ... @@ -12,6 +11,7 @@ import org.springframework.stereotype.Component;
12 11 import org.springframework.web.socket.*;
13 12  
14 13 import java.util.*;
  14 +import java.util.concurrent.ConcurrentHashMap;
15 15  
16 16 /**
17 17 * @author PanZhao
... ... @@ -22,12 +22,14 @@ public class RealControlSocketHandler implements WebSocketHandler {
22 22  
23 23 Logger logger = LoggerFactory.getLogger(this.getClass());
24 24  
25   - private static final ArrayList<WebSocketSession> users;
26   - private static final ArrayListMultimap<String, WebSocketSession> listenMap;
  25 + private static ArrayList<WebSocketSession> users;
  26 + //private static final ArrayListMultimap<String, WebSocketSession> listenMap;
  27 + private static ConcurrentHashMap<String, List<WebSocketSession>> listenMap;
27 28  
28 29 static {
29 30 users = new ArrayList<WebSocketSession>();
30   - listenMap = ArrayListMultimap.create();
  31 + //listenMap = ArrayListMultimap.create();
  32 + listenMap = new ConcurrentHashMap();
31 33 }
32 34  
33 35 @Override
... ... @@ -35,19 +37,31 @@ public class RealControlSocketHandler implements WebSocketHandler {
35 37 throws Exception {
36 38 users.remove(session);
37 39 //清理监听
38   - Set<String> keys = listenMap.keySet();
39   - Map<String, WebSocketSession> remMap = new HashMap<>();
40   - for(String k : keys){
41   - if(listenMap.get(k).contains(session))
42   - remMap.put(k, session);
43   - }
  40 + //Set<String> keys = listenMap.keySet();
  41 + //Map<String, WebSocketSession> remMap = new HashMap<>();
  42 +
  43 + int vsCount=0;
  44 + Collection<List<WebSocketSession>> vs = listenMap.values();
  45 + for(List<WebSocketSession> list : vs){
  46 + list.remove(session);
44 47  
45   - Set<String> remSet = remMap.keySet();
  48 + vsCount += list.size();
  49 + }
  50 + /*List<WebSocketSession> vs;
  51 + for(String k : keys){
  52 + //vs = listenMap.get(k);
  53 + //vs.remove(session);
  54 + listenMap.get(k).remove()
  55 + *//*if(listenMap.get(k).contains(session))
  56 + remMap.put(k, session);*//*
  57 + }*/
  58 +
  59 + /*Set<String> remSet = remMap.keySet();
46 60 for(String k : remSet){
47 61 listenMap.remove(k, remMap.get(k));
48 62 logger.info("web socket close, remove listen K: "+ k);
49   - }
50   - logger.info("listen values size: " + listenMap.values().size() + " -CloseStatus:" + arg1);
  63 + }*/
  64 + logger.info("listen values size: " + vsCount + " -CloseStatus:" + arg1);
51 65 }
52 66  
53 67 @Override
... ... @@ -65,8 +79,12 @@ public class RealControlSocketHandler implements WebSocketHandler {
65 79 //注册线路监听
66 80 List<String> idx = Splitter.on(",").splitToList(jsonObj.getString("idx"));
67 81 for(String lineCode : idx){
68   - if(BasicData.lineCode2NameMap.containsKey(lineCode))
69   - listenMap.put(lineCode, session);
  82 + if(BasicData.lineCode2NameMap.containsKey(lineCode)){
  83 + if(!listenMap.containsKey(lineCode)){
  84 + listenMap.put(lineCode, new ArrayList<WebSocketSession>());
  85 + }
  86 + listenMap.get(lineCode).add(session);
  87 + }
70 88 }
71 89 break;
72 90  
... ... @@ -97,12 +115,11 @@ public class RealControlSocketHandler implements WebSocketHandler {
97 115 public void sendMessageToLine(String lineCode, String msg) {
98 116  
99 117 TextMessage message = new TextMessage(msg.getBytes());
  118 + List<WebSocketSession> list = listenMap.get(lineCode);
  119 + if(list == null || list.size() == 0)
  120 + return;
100 121  
101   - Iterator<WebSocketSession> iterator = listenMap.get(lineCode).iterator();
102   -
103   - WebSocketSession user;
104   - while(iterator.hasNext()){
105   - user = iterator.next();
  122 + for(WebSocketSession user : list){
106 123 try {
107 124 if (user.isOpen()) {
108 125 user.sendMessage(message);
... ...