Commit 8b6cadec8c24af8f233b52544493781913ca9f85

Authored by 潘钊
1 parent 72fd6f2d

update...

src/main/java/com/bsth/data/gpsdata_v2/DataHandleProcess.java
@@ -4,18 +4,14 @@ import com.bsth.data.gpsdata_v2.cache.GpsCacheData; @@ -4,18 +4,14 @@ import com.bsth.data.gpsdata_v2.cache.GpsCacheData;
4 import com.bsth.data.gpsdata_v2.entity.GpsEntity; 4 import com.bsth.data.gpsdata_v2.entity.GpsEntity;
5 import com.bsth.data.gpsdata_v2.handlers.*; 5 import com.bsth.data.gpsdata_v2.handlers.*;
6 import com.google.common.collect.ArrayListMultimap; 6 import com.google.common.collect.ArrayListMultimap;
  7 +import org.apache.commons.lang3.StringUtils;
7 import org.slf4j.Logger; 8 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory; 9 import org.slf4j.LoggerFactory;
9 import org.springframework.beans.factory.annotation.Autowired; 10 import org.springframework.beans.factory.annotation.Autowired;
10 import org.springframework.stereotype.Component; 11 import org.springframework.stereotype.Component;
11 12
12 -import java.util.ArrayList;  
13 -import java.util.List;  
14 -import java.util.Set;  
15 -import java.util.concurrent.CountDownLatch;  
16 -import java.util.concurrent.ExecutorService;  
17 -import java.util.concurrent.Executors;  
18 -import java.util.concurrent.ThreadFactory; 13 +import java.util.*;
  14 +import java.util.concurrent.*;
19 15
20 /** 16 /**
21 * 实时信号数据处理 17 * 实时信号数据处理
@@ -24,12 +20,6 @@ import java.util.concurrent.ThreadFactory; @@ -24,12 +20,6 @@ import java.util.concurrent.ThreadFactory;
24 @Component 20 @Component
25 public class DataHandleProcess { 21 public class DataHandleProcess {
26 22
27 - static Logger logger = LoggerFactory.getLogger(DataHandleProcess.class);  
28 - final static int POOL_SIZE = 25;  
29 -  
30 - static ExecutorService threadPool = Executors.newFixedThreadPool(POOL_SIZE + 1, new HandlerThreadFactory());  
31 - public static CountDownLatch count;  
32 -  
33 @Autowired 23 @Autowired
34 GpsStateProcess gpsStateProcess; 24 GpsStateProcess gpsStateProcess;
35 @Autowired 25 @Autowired
@@ -42,10 +32,17 @@ public class DataHandleProcess { @@ -42,10 +32,17 @@ public class DataHandleProcess {
42 OutStationProcess outStationProcess; 32 OutStationProcess outStationProcess;
43 @Autowired 33 @Autowired
44 ReverseRouteProcess reverseRouteProcess; 34 ReverseRouteProcess reverseRouteProcess;
45 -  
46 @Autowired 35 @Autowired
47 GpsRealData gpsRealData; 36 GpsRealData gpsRealData;
48 37
  38 +
  39 + static Logger logger = LoggerFactory.getLogger(DataHandleProcess.class);
  40 +
  41 + final static int POOL_SIZE = 25;
  42 +
  43 + static ExecutorService threadPool = Executors.newFixedThreadPool(POOL_SIZE + 1);
  44 + public static CountDownLatch count;
  45 +
49 static long lastTime; 46 static long lastTime;
50 47
51 public static boolean isBlock() { 48 public static boolean isBlock() {
@@ -54,7 +51,7 @@ public class DataHandleProcess { @@ -54,7 +51,7 @@ public class DataHandleProcess {
54 51
55 public void handle(List<GpsEntity> list) { 52 public void handle(List<GpsEntity> list) {
56 try { 53 try {
57 - if(list.size() == 0) 54 + if (list.size() == 0)
58 return; 55 return;
59 lastTime = System.currentTimeMillis(); 56 lastTime = System.currentTimeMillis();
60 //按设备号分组数据(一个设备的多条数据,必须在一个线程里跑) 57 //按设备号分组数据(一个设备的多条数据,必须在一个线程里跑)
@@ -76,21 +73,50 @@ public class DataHandleProcess { @@ -76,21 +73,50 @@ public class DataHandleProcess {
76 logger.info("analyse gps size: " + list.size() + ", ks: " + ks.size()); 73 logger.info("analyse gps size: " + list.size() + ", ks: " + ks.size());
77 count = new CountDownLatch(ks.size()); 74 count = new CountDownLatch(ks.size());
78 75
  76 + List<Future> fRs = new ArrayList<>(ks.size());
79 for (Integer index : ks) { 77 for (Integer index : ks) {
80 - threadPool.execute(new SignalHandleThread(dataListMap.get(index), count)); 78 + fRs.add(threadPool.submit(new SignalHandleThread(dataListMap.get(index), count)));
81 } 79 }
82 80
  81 + //按线路分组gps
  82 + /*ArrayListMultimap multimap = ArrayListMultimap.create();
  83 + for (GpsEntity gps : list) {
  84 + multimap.put(gps.getLineId(), gps);
  85 + }
  86 +
  87 + Set<String> ks = multimap.keySet();
  88 +
  89 + logger.info("analyse gps size: " + list.size() + ", ks: " + ks.size());
  90 + count = new CountDownLatch(ks.size());
  91 +
  92 + for (String lineCode : ks) {
  93 + threadPool.execute(new SignalHandleThread(multimap.get(lineCode), count));
  94 + }*/
  95 +
83 //等待子线程结束 96 //等待子线程结束
84 count.await(); 97 count.await();
85 98
  99 + for (Future f : fRs) {
  100 + try {
  101 + f.get();
  102 + } catch (InterruptedException e) {
  103 + } catch (ExecutionException e) {
  104 + logger.error(e.getCause().getMessage());
  105 + }
  106 + }
  107 +
86 //加入实时gps对照 108 //加入实时gps对照
87 for (GpsEntity gps : list) 109 for (GpsEntity gps : list)
88 gpsRealData.put(gps); 110 gpsRealData.put(gps);
  111 +
  112 + logger.info("time , " + (System.currentTimeMillis() - lastTime));
89 } catch (Exception e) { 113 } catch (Exception e) {
90 logger.error("", e); 114 logger.error("", e);
91 } 115 }
92 } 116 }
93 117
  118 + static GpsComp comp = new GpsComp();
  119 +
94 public class SignalHandleThread implements Runnable { 120 public class SignalHandleThread implements Runnable {
95 121
96 List<GpsEntity> list; 122 List<GpsEntity> list;
@@ -103,45 +129,44 @@ public class DataHandleProcess { @@ -103,45 +129,44 @@ public class DataHandleProcess {
103 129
104 @Override 130 @Override
105 public void run() { 131 public void run() {
106 - try {  
107 - for (GpsEntity gps : list) {  
108 - try{  
109 - if(Math.abs(gps.getTimestamp() - gps.getServerTimestamp()) > 1000 * 60 * 20)  
110 - continue;  
111 -  
112 - gpsStateProcess.process(gps);//状态处理  
113 - stationInsideProcess.process(gps);//场站内外判定  
114 - reverseRouteProcess.process(gps);//反向路由处理  
115 - abnormalStateProcess.process(gps);//超速越界  
116 -  
117 - inStationProcess.process(gps);//进站  
118 - outStationProcess.process(gps);//出站  
119 -  
120 - GpsCacheData.putGps(gps);//历史gps缓存  
121 - }catch (Exception e){  
122 - logger.error("", e);  
123 - } 132 + //try {
  133 + Collections.sort(list, comp);
  134 + for (GpsEntity gps : list) {
  135 + try {
  136 + if(StringUtils.isEmpty(gps.getNbbm()))
  137 + continue;
  138 + if (Math.abs(gps.getTimestamp() - gps.getServerTimestamp()) > 1000 * 60 * 20)
  139 + continue;
  140 +
  141 + gpsStateProcess.process(gps);//状态处理
  142 + stationInsideProcess.process(gps);//场站内外判定
  143 + reverseRouteProcess.process(gps);//反向路由处理
  144 + abnormalStateProcess.process(gps);//超速越界
  145 +
  146 + inStationProcess.process(gps);//进站
  147 + outStationProcess.process(gps);//出站
  148 +
  149 + GpsCacheData.putGps(gps);//历史gps缓存
  150 + } catch (Exception e) {
  151 + logger.error("", e);
124 } 152 }
125 - } finally { 153 + }
  154 +
  155 + logger.info(Thread.currentThread().getName() + " -countDown : " + count.getCount());
  156 + count.countDown();
  157 + /*} finally {
126 if (count != null) 158 if (count != null)
127 count.countDown(); 159 count.countDown();
128 - } 160 + logger.info(Thread.currentThread().getName() + " -countDown : " + count.getCount());
  161 + }*/
129 } 162 }
130 } 163 }
131 164
132 - static class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {  
133 - @Override  
134 - public void uncaughtException(Thread t, Throwable e) {  
135 - logger.error("caught " , e);  
136 - }  
137 - } 165 + public static class GpsComp implements Comparator<GpsEntity> {
138 166
139 - static class HandlerThreadFactory implements ThreadFactory {  
140 @Override 167 @Override
141 - public Thread newThread(Runnable r) {  
142 - Thread t = new Thread(r);  
143 - t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());  
144 - return t; 168 + public int compare(GpsEntity g1, GpsEntity g2) {
  169 + return g1.getTimestamp().compareTo(g2.getTimestamp());
145 } 170 }
146 } 171 }
147 } 172 }
148 \ No newline at end of file 173 \ No newline at end of file