Commit 3ebf183b21ad35889af9825c2abf059a153e7ab2

Authored by 潘钊
1 parent aecfc182

update...

src/main/java/com/bsth/controller/realcontrol/anomalyCheckController.java
@@ -51,10 +51,17 @@ public class anomalyCheckController { @@ -51,10 +51,17 @@ public class anomalyCheckController {
51 } 51 }
52 } 52 }
53 53
54 - @RequestMapping(value = "/gpsClientReconn", method = RequestMethod.POST) 54 + @Autowired
  55 + ClientApp clientApp;
  56 +
  57 + @RequestMapping(value = "/gpsClientDestroy", method = RequestMethod.POST)
55 public void gpsClientReconn(){ 58 public void gpsClientReconn(){
56 - ClientApp.pdreconn();  
57 - ClientApp.pfreconn(); 59 + clientApp.destroy();
  60 + }
  61 +
  62 + @RequestMapping(value = "/gpsClientInit", method = RequestMethod.POST)
  63 + public void gpsClientInit(){
  64 + clientApp.init();
58 } 65 }
59 66
60 @RequestMapping(value = "/pdClose", method = RequestMethod.POST) 67 @RequestMapping(value = "/pdClose", method = RequestMethod.POST)
src/main/java/com/bsth/data/gpsdata/GpsRealData.java
@@ -4,6 +4,7 @@ import com.bsth.Application; @@ -4,6 +4,7 @@ import com.bsth.Application;
4 import com.bsth.data.BasicData; 4 import com.bsth.data.BasicData;
5 import com.bsth.data.forecast.ForecastRealServer; 5 import com.bsth.data.forecast.ForecastRealServer;
6 import com.bsth.data.gpsdata.client.ClientApp; 6 import com.bsth.data.gpsdata.client.ClientApp;
  7 +import com.bsth.data.gpsdata.client.GpsBeforeBuffer;
7 import com.bsth.data.gpsdata.thread.GpsDataLoaderThread; 8 import com.bsth.data.gpsdata.thread.GpsDataLoaderThread;
8 import com.bsth.data.gpsdata.thread.OfflineMonitorThread; 9 import com.bsth.data.gpsdata.thread.OfflineMonitorThread;
9 import com.bsth.data.schedule.DayOfSchedule; 10 import com.bsth.data.schedule.DayOfSchedule;
@@ -58,6 +59,8 @@ public class GpsRealData implements CommandLineRunner { @@ -58,6 +59,8 @@ public class GpsRealData implements CommandLineRunner {
58 59
59 @Autowired 60 @Autowired
60 ClientApp clientApp; 61 ClientApp clientApp;
  62 + @Autowired
  63 + GpsBeforeBuffer gpsBeforeBuffer;
61 @Override 64 @Override
62 public void run(String... arg0) throws Exception { 65 public void run(String... arg0) throws Exception {
63 logger.info("gpsDataLoader,20,3"); 66 logger.info("gpsDataLoader,20,3");
@@ -68,6 +71,7 @@ public class GpsRealData implements CommandLineRunner { @@ -68,6 +71,7 @@ public class GpsRealData implements CommandLineRunner {
68 71
69 //gps 客户端 72 //gps 客户端
70 //clientApp.init(); 73 //clientApp.init();
  74 + //gpsBeforeBuffer.init();
71 } 75 }
72 76
73 77
src/main/java/com/bsth/data/gpsdata/arrival/GpsRealAnalyse.java
@@ -11,6 +11,8 @@ import org.slf4j.LoggerFactory; @@ -11,6 +11,8 @@ import org.slf4j.LoggerFactory;
11 import org.springframework.beans.factory.annotation.Autowired; 11 import org.springframework.beans.factory.annotation.Autowired;
12 import org.springframework.stereotype.Component; 12 import org.springframework.stereotype.Component;
13 13
  14 +import java.util.Collections;
  15 +import java.util.Comparator;
14 import java.util.List; 16 import java.util.List;
15 import java.util.Set; 17 import java.util.Set;
16 import java.util.concurrent.CountDownLatch; 18 import java.util.concurrent.CountDownLatch;
@@ -51,17 +53,17 @@ public class GpsRealAnalyse { @@ -51,17 +53,17 @@ public class GpsRealAnalyse {
51 53
52 long t = System.currentTimeMillis(); 54 long t = System.currentTimeMillis();
53 logger.info("analyse gps size: " + list.size()); 55 logger.info("analyse gps size: " + list.size());
54 - //按车辆分组gps 56 + //按线路分组gps
55 ArrayListMultimap multimap = ArrayListMultimap.create(); 57 ArrayListMultimap multimap = ArrayListMultimap.create();
56 for(GpsEntity gps : list){ 58 for(GpsEntity gps : list){
57 - multimap.put(gps.getNbbm(), gps); 59 + multimap.put(gps.getLineId(), gps);
58 } 60 }
59 61
60 Set<String> ks = multimap.keySet(); 62 Set<String> ks = multimap.keySet();
61 CountDownLatch count = new CountDownLatch(ks.size()); 63 CountDownLatch count = new CountDownLatch(ks.size());
62 64
63 - for(String nbbm : ks){  
64 - threadPool.execute(new SignalHandleThread(multimap.get(nbbm), count)); 65 + for(String lineCode : ks){
  66 + threadPool.execute(new SignalHandleThread(multimap.get(lineCode), count));
65 } 67 }
66 68
67 try { 69 try {
@@ -78,6 +80,8 @@ public class GpsRealAnalyse { @@ -78,6 +80,8 @@ public class GpsRealAnalyse {
78 } 80 }
79 } 81 }
80 82
  83 + static GpsComp comp = new GpsComp();
  84 +
81 public class SignalHandleThread implements Runnable { 85 public class SignalHandleThread implements Runnable {
82 86
83 List<GpsEntity> list; 87 List<GpsEntity> list;
@@ -92,6 +96,7 @@ public class GpsRealAnalyse { @@ -92,6 +96,7 @@ public class GpsRealAnalyse {
92 public void run() { 96 public void run() {
93 97
94 try { 98 try {
  99 + Collections.sort(list, comp);
95 for(GpsEntity gps : list){ 100 for(GpsEntity gps : list){
96 //是否有任务 101 //是否有任务
97 boolean task; 102 boolean task;
@@ -122,4 +127,12 @@ public class GpsRealAnalyse { @@ -122,4 +127,12 @@ public class GpsRealAnalyse {
122 } 127 }
123 } 128 }
124 } 129 }
  130 +
  131 + public static class GpsComp implements Comparator<GpsEntity> {
  132 +
  133 + @Override
  134 + public int compare(GpsEntity g1, GpsEntity g2) {
  135 + return g1.getTimestamp().compareTo(g2.getTimestamp());
  136 + }
  137 + }
125 } 138 }
src/main/java/com/bsth/data/gpsdata/client/ClientApp.java
@@ -42,16 +42,7 @@ public class ClientApp { @@ -42,16 +42,7 @@ public class ClientApp {
42 static Logger logger = LoggerFactory.getLogger(ClientApp.class); 42 static Logger logger = LoggerFactory.getLogger(ClientApp.class);
43 private static ExecutorService exec; 43 private static ExecutorService exec;
44 44
45 - private ScheduledExecutorService sexec = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {  
46 -  
47 - @Override  
48 - public Thread newThread(Runnable r) {  
49 - // TODO Auto-generated method stub  
50 - Thread t = new Thread(r);  
51 - t.setName("SessionCheckExecutor");  
52 - return t;  
53 - }  
54 - }); 45 + private ScheduledExecutorService sexec;
55 46
56 public static boolean dconnect(String device) { 47 public static boolean dconnect(String device) {
57 boolean flag = false; 48 boolean flag = false;
@@ -123,6 +114,20 @@ public class ClientApp { @@ -123,6 +114,20 @@ public class ClientApp {
123 pfconnect(ConfigUtil.get("forward.device.name")); 114 pfconnect(ConfigUtil.get("forward.device.name"));
124 } 115 }
125 116
  117 + public void destroy(){
  118 + try {
  119 + logger.warn("socket client destroy!!!");
  120 + exec.shutdownNow();
  121 + sexec.shutdownNow();
  122 +
  123 + pdDataConnector.dispose(true);
  124 + pfDataConnector.dispose(true);
  125 + } catch (Exception e) {
  126 + logger.error("", e);
  127 + }
  128 + }
  129 +
  130 +
126 public static void pdClose(){ 131 public static void pdClose(){
127 pdSession.closeNow(); 132 pdSession.closeNow();
128 } 133 }
@@ -149,7 +154,17 @@ public class ClientApp { @@ -149,7 +154,17 @@ public class ClientApp {
149 } 154 }
150 155
151 public void init() { 156 public void init() {
  157 + logger.warn("socket client init...");
152 exec = Executors.newFixedThreadPool(50); 158 exec = Executors.newFixedThreadPool(50);
  159 + sexec = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
  160 + @Override
  161 + public Thread newThread(Runnable r) {
  162 + // TODO Auto-generated method stub
  163 + Thread t = new Thread(r);
  164 + t.setName("SessionCheckExecutor");
  165 + return t;
  166 + }
  167 + });
153 sexec.scheduleAtFixedRate(new SessionChecker(), 1, 1, TimeUnit.MINUTES); 168 sexec.scheduleAtFixedRate(new SessionChecker(), 1, 1, TimeUnit.MINUTES);
154 /*******************************浦东********************************/ 169 /*******************************浦东********************************/
155 pdDataConnector = new NioSocketConnector(); 170 pdDataConnector = new NioSocketConnector();
@@ -192,9 +207,6 @@ public class ClientApp { @@ -192,9 +207,6 @@ public class ClientApp {
192 207
193 pfDataConnector.setHandler(pfClient); 208 pfDataConnector.setHandler(pfClient);
194 pfconnect(ConfigUtil.get("forward.device.name")); 209 pfconnect(ConfigUtil.get("forward.device.name"));
195 -  
196 -  
197 - gpsBeforeBuffer.init();  
198 } 210 }
199 211
200 212
src/main/java/com/bsth/data/gpsdata/client/GpsBeforeBuffer.java
@@ -6,6 +6,8 @@ import com.bsth.data.gpsdata.GpsEntity; @@ -6,6 +6,8 @@ import com.bsth.data.gpsdata.GpsEntity;
6 import com.bsth.data.gpsdata.arrival.GpsRealAnalyse; 6 import com.bsth.data.gpsdata.arrival.GpsRealAnalyse;
7 import com.bsth.data.gpsdata.client.pd.protocol.BasicInfo; 7 import com.bsth.data.gpsdata.client.pd.protocol.BasicInfo;
8 import org.apache.commons.lang3.StringUtils; 8 import org.apache.commons.lang3.StringUtils;
  9 +import org.slf4j.Logger;
  10 +import org.slf4j.LoggerFactory;
9 import org.springframework.beans.factory.annotation.Autowired; 11 import org.springframework.beans.factory.annotation.Autowired;
10 import org.springframework.stereotype.Component; 12 import org.springframework.stereotype.Component;
11 13
@@ -66,6 +68,7 @@ public class GpsBeforeBuffer { @@ -66,6 +68,7 @@ public class GpsBeforeBuffer {
66 Application.mainServices.scheduleWithFixedDelay(gpsHandleThread, 20 * 1000, 1200, TimeUnit.MILLISECONDS); 68 Application.mainServices.scheduleWithFixedDelay(gpsHandleThread, 20 * 1000, 1200, TimeUnit.MILLISECONDS);
67 } 69 }
68 70
  71 + static int idleCount = 0;
69 @Component 72 @Component
70 public static class GpsHandleThread extends Thread{ 73 public static class GpsHandleThread extends Thread{
71 74
@@ -74,6 +77,11 @@ public class GpsBeforeBuffer { @@ -74,6 +77,11 @@ public class GpsBeforeBuffer {
74 @Autowired 77 @Autowired
75 GpsRealAnalyse gpsRealAnalyse; 78 GpsRealAnalyse gpsRealAnalyse;
76 79
  80 + @Autowired
  81 + ClientApp clientApp;
  82 +
  83 + Logger log = LoggerFactory.getLogger(this.getClass());
  84 +
77 @Override 85 @Override
78 public void run() { 86 public void run() {
79 list = new ArrayList<>(200); 87 list = new ArrayList<>(200);
@@ -86,6 +94,16 @@ public class GpsBeforeBuffer { @@ -86,6 +94,16 @@ public class GpsBeforeBuffer {
86 list.add(gps); 94 list.add(gps);
87 } 95 }
88 96
  97 + if(list.size() == 0){
  98 + idleCount ++;
  99 + //连续40次没有数据,重建socket连接
  100 + if(idleCount == 40){
  101 + log.info("idleCount == 40");
  102 + idleCount = 0;
  103 + clientApp.destroy();
  104 + clientApp.init();
  105 + }
  106 + }
89 gpsRealAnalyse.analyse(list); 107 gpsRealAnalyse.analyse(list);
90 } 108 }
91 } 109 }
src/main/java/com/bsth/data/gpsdata/recovery/GpsDataRecovery.java
@@ -45,11 +45,11 @@ public class GpsDataRecovery implements ApplicationContextAware { @@ -45,11 +45,11 @@ public class GpsDataRecovery implements ApplicationContextAware {
45 public void recovery() { 45 public void recovery() {
46 List<GpsEntity> list = loadData(); 46 List<GpsEntity> list = loadData();
47 47
48 - //按车辆分组数据 48 + //按线路分组数据
49 ArrayListMultimap<String, GpsEntity> listMap = ArrayListMultimap.create(); 49 ArrayListMultimap<String, GpsEntity> listMap = ArrayListMultimap.create();
50 for (GpsEntity gps : list) { 50 for (GpsEntity gps : list) {
51 - if (gps.getNbbm() != null)  
52 - listMap.put(gps.getNbbm(), gps); 51 + if (gps.getLineId() != null)
  52 + listMap.put(gps.getLineId(), gps);
53 } 53 }
54 54
55 55
@@ -57,9 +57,9 @@ public class GpsDataRecovery implements ApplicationContextAware { @@ -57,9 +57,9 @@ public class GpsDataRecovery implements ApplicationContextAware {
57 57
58 CountDownLatch count = new CountDownLatch(keys.size()); 58 CountDownLatch count = new CountDownLatch(keys.size());
59 GpsComp comp = new GpsComp(); 59 GpsComp comp = new GpsComp();
60 - for (String nbbm : keys) {  
61 - Collections.sort(listMap.get(nbbm), comp);  
62 - threadPool.execute(new RecoveryThread(listMap.get(nbbm), count)); 60 + for (String lineId : keys) {
  61 + Collections.sort(listMap.get(lineId), comp);
  62 + threadPool.execute(new RecoveryThread(listMap.get(lineId), count));
63 /*if(nbbm.equals("W7C-001")) 63 /*if(nbbm.equals("W7C-001"))
64 new RecoveryThread(listMap.get(nbbm), count).run();*/ 64 new RecoveryThread(listMap.get(nbbm), count).run();*/
65 } 65 }