Commit 5feb311adcebcadd10c980faab4fb08bd2c874b5
1 parent
d1e3bd73
1.GpsCacheData中HashMap替换成并发Map
Showing
2 changed files
with
371 additions
and
390 deletions
src/main/java/com/bsth/data/gpsdata_v2/DataHandleProcess.java
| 1 | -package com.bsth.data.gpsdata_v2; | ||
| 2 | - | ||
| 3 | -import com.alibaba.fastjson.JSON; | ||
| 4 | -import com.bsth.data.gpsdata_v2.cache.GpsCacheData; | ||
| 5 | -import com.bsth.data.gpsdata_v2.entity.GpsEntity; | ||
| 6 | -import com.bsth.data.gpsdata_v2.handlers.*; | ||
| 7 | -import com.bsth.email.SendEmailController; | ||
| 8 | -import com.bsth.email.entity.EmailBean; | ||
| 9 | -import com.google.common.collect.ArrayListMultimap; | ||
| 10 | -import org.apache.commons.lang3.StringUtils; | ||
| 11 | -import org.slf4j.Logger; | ||
| 12 | -import org.slf4j.LoggerFactory; | ||
| 13 | -import org.springframework.beans.factory.annotation.Autowired; | ||
| 14 | -import org.springframework.stereotype.Component; | ||
| 15 | - | ||
| 16 | -import java.util.*; | ||
| 17 | -import java.util.concurrent.CountDownLatch; | ||
| 18 | -import java.util.concurrent.ExecutorService; | ||
| 19 | -import java.util.concurrent.Executors; | ||
| 20 | -import java.util.concurrent.ThreadFactory; | ||
| 21 | -import java.util.concurrent.TimeUnit; | ||
| 22 | - | ||
| 23 | -/** | ||
| 24 | - * 实时信号数据处理 | ||
| 25 | - * Created by panzhao on 2017/11/15. | ||
| 26 | - */ | ||
| 27 | -@Component | ||
| 28 | -public class DataHandleProcess { | ||
| 29 | - | ||
| 30 | - @Autowired | ||
| 31 | - GpsStateProcess gpsStateProcess; | ||
| 32 | - @Autowired | ||
| 33 | - StationInsideProcess stationInsideProcess; | ||
| 34 | - @Autowired | ||
| 35 | - AbnormalStateProcess abnormalStateProcess; | ||
| 36 | - @Autowired | ||
| 37 | - InStationProcess inStationProcess; | ||
| 38 | - @Autowired | ||
| 39 | - OutStationProcess outStationProcess; | ||
| 40 | - @Autowired | ||
| 41 | - ReverseRouteProcess reverseRouteProcess; | ||
| 42 | - @Autowired | ||
| 43 | - GpsRealData gpsRealData; | ||
| 44 | - // 发送邮件 | ||
| 45 | - @Autowired | ||
| 46 | - private SendEmailController sendEmailController; | ||
| 47 | - | ||
| 48 | - | ||
| 49 | - static Logger logger = LoggerFactory.getLogger(DataHandleProcess.class); | ||
| 50 | - | ||
| 51 | - final static int POOL_SIZE = 20; | ||
| 52 | - | ||
| 53 | - static ExecutorService threadPool = Executors.newFixedThreadPool(POOL_SIZE + 1, new ThreadFactory() { | ||
| 54 | - | ||
| 55 | - @Override | ||
| 56 | - public Thread newThread(Runnable r) { | ||
| 57 | - // TODO Auto-generated method stub | ||
| 58 | - Thread t = new Thread(r); | ||
| 59 | - t.setName("GPSProcessor"); | ||
| 60 | - | ||
| 61 | - return t; | ||
| 62 | - } | ||
| 63 | - | ||
| 64 | - }); | ||
| 65 | - public static CountDownLatch count; | ||
| 66 | - | ||
| 67 | - static long lastTime; | ||
| 68 | - | ||
| 69 | - public static boolean isBlock() { | ||
| 70 | - return System.currentTimeMillis() - lastTime > 1000 * 30; | ||
| 71 | - } | ||
| 72 | - | ||
| 73 | - private void shutdownAndAwaitTermination(ExecutorService pool) { | ||
| 74 | - pool.shutdown(); | ||
| 75 | - try { | ||
| 76 | - if (!pool.awaitTermination(500, TimeUnit.MILLISECONDS)) { | ||
| 77 | - pool.shutdownNow(); | ||
| 78 | - } | ||
| 79 | - if (!pool.awaitTermination(500, TimeUnit.MILLISECONDS)) { | ||
| 80 | - logger.error("线程池无法正常终止"); | ||
| 81 | - } | ||
| 82 | - } catch (InterruptedException e) { | ||
| 83 | - pool.shutdown(); | ||
| 84 | - Thread.currentThread().interrupt(); | ||
| 85 | - } | ||
| 86 | - } | ||
| 87 | - | ||
| 88 | - public void handle(List<GpsEntity> list) { | ||
| 89 | - try { | ||
| 90 | - if (list.size() == 0) | ||
| 91 | - return; | ||
| 92 | - lastTime = System.currentTimeMillis(); | ||
| 93 | - //按设备号分组数据(一个设备的多条数据,必须在一个线程里跑) | ||
| 94 | - ArrayListMultimap multimap = ArrayListMultimap.create(); | ||
| 95 | - for (GpsEntity gps : list) { | ||
| 96 | - multimap.put(gps.getDeviceId(), gps); | ||
| 97 | - } | ||
| 98 | - List<String> deviceList = new ArrayList<>(multimap.keySet()); | ||
| 99 | - | ||
| 100 | - //数据均分给线程 | ||
| 101 | - ArrayListMultimap dataListMap = ArrayListMultimap.create(); | ||
| 102 | - int size = deviceList.size(), threadIndex = 0, threadSize = size / POOL_SIZE; | ||
| 103 | - if(threadSize==0) | ||
| 104 | - threadSize = size; | ||
| 105 | - for (int i = 0; i < size; i++) { | ||
| 106 | - if (i % threadSize == 0) | ||
| 107 | - threadIndex++; | ||
| 108 | - dataListMap.putAll(threadIndex, multimap.get(deviceList.get(i))); | ||
| 109 | - } | ||
| 110 | - Set<Integer> ks = dataListMap.keySet(); | ||
| 111 | - logger.info("analyse gps size: " + list.size() + ", ks: " + ks.size()); | ||
| 112 | - count = new CountDownLatch(ks.size()); | ||
| 113 | - | ||
| 114 | - logger.info(JSON.toJSONString(ks)); | ||
| 115 | - for (Integer index : ks) { | ||
| 116 | - threadPool.execute(new SignalHandleThread(dataListMap.get(index), count)); | ||
| 117 | - } | ||
| 118 | - | ||
| 119 | - | ||
| 120 | - //等待子线程结束 | ||
| 121 | - boolean isNormal = count.await(5, TimeUnit.SECONDS); | ||
| 122 | - if (!isNormal) { | ||
| 123 | - try { | ||
| 124 | - shutdownAndAwaitTermination(threadPool); | ||
| 125 | - threadPool = Executors.newFixedThreadPool(POOL_SIZE + 1, new ThreadFactory() { | ||
| 126 | - | ||
| 127 | - @Override | ||
| 128 | - public Thread newThread(Runnable r) { | ||
| 129 | - // TODO Auto-generated method stub | ||
| 130 | - Thread t = new Thread(r); | ||
| 131 | - t.setName("GPSProcessor"); | ||
| 132 | - | ||
| 133 | - return t; | ||
| 134 | - } | ||
| 135 | - | ||
| 136 | - }); | ||
| 137 | - //发送邮件 | ||
| 138 | - EmailBean mail = new EmailBean(); | ||
| 139 | - mail.setSubject("线调GPS处理"); | ||
| 140 | - mail.setContent("GPS处理超时,检查线程栈文件信息<br/>"); | ||
| 141 | - sendEmailController.sendMail("113252620@qq.com", mail); | ||
| 142 | - logger.info("DataHandlerProcess:邮件发送成功!"); | ||
| 143 | - } catch (Exception e){ | ||
| 144 | - logger.error("DataHandlerProcess:邮件发送失败!",e); | ||
| 145 | - } | ||
| 146 | - } | ||
| 147 | - | ||
| 148 | - //加入实时gps对照 | ||
| 149 | - for (GpsEntity gps : list) | ||
| 150 | - gpsRealData.put(gps); | ||
| 151 | - | ||
| 152 | - logger.info("time , " + (System.currentTimeMillis() - lastTime)); | ||
| 153 | - } catch (Exception e) { | ||
| 154 | - logger.error("", e); | ||
| 155 | - } | ||
| 156 | - } | ||
| 157 | - | ||
| 158 | - static GpsComp comp = new GpsComp(); | ||
| 159 | - | ||
| 160 | - public class SignalHandleThread implements Runnable { | ||
| 161 | - | ||
| 162 | - List<GpsEntity> list; | ||
| 163 | - CountDownLatch count; | ||
| 164 | - | ||
| 165 | - SignalHandleThread(List<GpsEntity> gpsList, CountDownLatch count) { | ||
| 166 | - this.list = gpsList; | ||
| 167 | - this.count = count; | ||
| 168 | - } | ||
| 169 | - | ||
| 170 | - @Override | ||
| 171 | - public void run() { | ||
| 172 | - try { | ||
| 173 | - Collections.sort(list, comp); | ||
| 174 | - GpsEntity gps; | ||
| 175 | - for(int i = 0,len = list.size(); i< len ;i ++){ | ||
| 176 | - if (Thread.currentThread().isInterrupted()) break; | ||
| 177 | - gps = list.get(i); | ||
| 178 | - | ||
| 179 | - try { | ||
| 180 | - if (StringUtils.isEmpty(gps.getNbbm())) | ||
| 181 | - continue; | ||
| 182 | - if (Math.abs(gps.getTimestamp() - gps.getServerTimestamp()) > 1000 * 60 * 20) | ||
| 183 | - continue; | ||
| 184 | - | ||
| 185 | - gpsStateProcess.process(gps);//状态处理 | ||
| 186 | - stationInsideProcess.process(gps);//场站内外判定 | ||
| 187 | - reverseRouteProcess.process(gps);//反向路由处理 | ||
| 188 | - abnormalStateProcess.process(gps);//超速越界 | ||
| 189 | - | ||
| 190 | - inStationProcess.process(gps);//进站 | ||
| 191 | - outStationProcess.process(gps);//出站 | ||
| 192 | - | ||
| 193 | - | ||
| 194 | - GpsCacheData.putGps(gps);//历史gps缓存 | ||
| 195 | - } catch (Throwable e) { | ||
| 196 | - logger.error("", e); | ||
| 197 | - } | ||
| 198 | - } | ||
| 199 | - } finally { | ||
| 200 | - if (count != null) | ||
| 201 | - count.countDown(); | ||
| 202 | - } | ||
| 203 | - } | ||
| 204 | - } | ||
| 205 | - | ||
| 206 | - public static class GpsComp implements Comparator<GpsEntity> { | ||
| 207 | - | ||
| 208 | - @Override | ||
| 209 | - public int compare(GpsEntity g1, GpsEntity g2) { | ||
| 210 | - return g1.getTimestamp().compareTo(g2.getTimestamp()); | ||
| 211 | - } | ||
| 212 | - } | 1 | +package com.bsth.data.gpsdata_v2; |
| 2 | + | ||
| 3 | +import com.alibaba.fastjson.JSON; | ||
| 4 | +import com.bsth.data.gpsdata_v2.cache.GpsCacheData; | ||
| 5 | +import com.bsth.data.gpsdata_v2.entity.GpsEntity; | ||
| 6 | +import com.bsth.data.gpsdata_v2.handlers.*; | ||
| 7 | +import com.bsth.email.SendEmailController; | ||
| 8 | +import com.bsth.email.entity.EmailBean; | ||
| 9 | +import com.google.common.collect.ArrayListMultimap; | ||
| 10 | +import org.apache.commons.lang3.StringUtils; | ||
| 11 | +import org.slf4j.Logger; | ||
| 12 | +import org.slf4j.LoggerFactory; | ||
| 13 | +import org.springframework.beans.factory.annotation.Autowired; | ||
| 14 | +import org.springframework.stereotype.Component; | ||
| 15 | + | ||
| 16 | +import java.util.*; | ||
| 17 | +import java.util.concurrent.CountDownLatch; | ||
| 18 | +import java.util.concurrent.ExecutorService; | ||
| 19 | +import java.util.concurrent.Executors; | ||
| 20 | +import java.util.concurrent.ThreadFactory; | ||
| 21 | +import java.util.concurrent.TimeUnit; | ||
| 22 | + | ||
| 23 | +/** | ||
| 24 | + * 实时信号数据处理 | ||
| 25 | + * Created by panzhao on 2017/11/15. | ||
| 26 | + */ | ||
| 27 | +@Component | ||
| 28 | +public class DataHandleProcess { | ||
| 29 | + | ||
| 30 | + @Autowired | ||
| 31 | + GpsStateProcess gpsStateProcess; | ||
| 32 | + @Autowired | ||
| 33 | + StationInsideProcess stationInsideProcess; | ||
| 34 | + @Autowired | ||
| 35 | + AbnormalStateProcess abnormalStateProcess; | ||
| 36 | + @Autowired | ||
| 37 | + InStationProcess inStationProcess; | ||
| 38 | + @Autowired | ||
| 39 | + OutStationProcess outStationProcess; | ||
| 40 | + @Autowired | ||
| 41 | + ReverseRouteProcess reverseRouteProcess; | ||
| 42 | + @Autowired | ||
| 43 | + GpsRealData gpsRealData; | ||
| 44 | + // 发送邮件 | ||
| 45 | + @Autowired | ||
| 46 | + private SendEmailController sendEmailController; | ||
| 47 | + | ||
| 48 | + | ||
| 49 | + static Logger logger = LoggerFactory.getLogger(DataHandleProcess.class); | ||
| 50 | + | ||
| 51 | + final static int POOL_SIZE = 20; | ||
| 52 | + | ||
| 53 | + static ExecutorService threadPool = Executors.newFixedThreadPool(POOL_SIZE + 1, new ThreadFactory() { | ||
| 54 | + | ||
| 55 | + @Override | ||
| 56 | + public Thread newThread(Runnable r) { | ||
| 57 | + // TODO Auto-generated method stub | ||
| 58 | + Thread t = new Thread(r); | ||
| 59 | + t.setName("GPSProcessor"); | ||
| 60 | + | ||
| 61 | + return t; | ||
| 62 | + } | ||
| 63 | + | ||
| 64 | + }); | ||
| 65 | + | ||
| 66 | + static long lastTime; | ||
| 67 | + | ||
| 68 | + public static boolean isBlock() { | ||
| 69 | + return System.currentTimeMillis() - lastTime > 1000 * 30; | ||
| 70 | + } | ||
| 71 | + | ||
| 72 | + public void handle(List<GpsEntity> list) { | ||
| 73 | + try { | ||
| 74 | + if (list.size() == 0) | ||
| 75 | + return; | ||
| 76 | + lastTime = System.currentTimeMillis(); | ||
| 77 | + //按设备号分组数据(一个设备的多条数据,必须在一个线程里跑) | ||
| 78 | + ArrayListMultimap multimap = ArrayListMultimap.create(); | ||
| 79 | + for (GpsEntity gps : list) { | ||
| 80 | + multimap.put(gps.getDeviceId(), gps); | ||
| 81 | + } | ||
| 82 | + List<String> deviceList = new ArrayList<>(multimap.keySet()); | ||
| 83 | + | ||
| 84 | + //数据均分给线程 | ||
| 85 | + ArrayListMultimap dataListMap = ArrayListMultimap.create(); | ||
| 86 | + int size = deviceList.size(), threadIndex = 0, threadSize = size / POOL_SIZE; | ||
| 87 | + if(threadSize==0) | ||
| 88 | + threadSize = size; | ||
| 89 | + for (int i = 0; i < size; i++) { | ||
| 90 | + if (i % threadSize == 0) | ||
| 91 | + threadIndex++; | ||
| 92 | + dataListMap.putAll(threadIndex, multimap.get(deviceList.get(i))); | ||
| 93 | + } | ||
| 94 | + Set<Integer> ks = dataListMap.keySet(); | ||
| 95 | + logger.info("analyse gps size: " + list.size() + ", ks: " + ks.size()); | ||
| 96 | + CountDownLatch count = new CountDownLatch(ks.size()); | ||
| 97 | + | ||
| 98 | + logger.info(JSON.toJSONString(ks)); | ||
| 99 | + for (Integer index : ks) { | ||
| 100 | + threadPool.execute(new SignalHandleThread(dataListMap.get(index), count)); | ||
| 101 | + } | ||
| 102 | + | ||
| 103 | + | ||
| 104 | + //等待子线程结束 | ||
| 105 | + boolean isNormal = count.await(1500, TimeUnit.MILLISECONDS); | ||
| 106 | + if (!isNormal) { | ||
| 107 | + try { | ||
| 108 | + //发送邮件 | ||
| 109 | + EmailBean mail = new EmailBean(); | ||
| 110 | + mail.setSubject("线调GPS处理"); | ||
| 111 | + mail.setContent("GPS处理超时,检查线程栈文件信息<br/>"); | ||
| 112 | + sendEmailController.sendMail("113252620@qq.com", mail); | ||
| 113 | + logger.info("DataHandlerProcess:邮件发送成功!"); | ||
| 114 | + } catch (Exception e){ | ||
| 115 | + logger.error("DataHandlerProcess:邮件发送失败!",e); | ||
| 116 | + } | ||
| 117 | + } | ||
| 118 | + | ||
| 119 | + //加入实时gps对照 | ||
| 120 | + for (GpsEntity gps : list) | ||
| 121 | + gpsRealData.put(gps); | ||
| 122 | + | ||
| 123 | + logger.info("time , " + (System.currentTimeMillis() - lastTime)); | ||
| 124 | + } catch (Exception e) { | ||
| 125 | + logger.error("", e); | ||
| 126 | + } | ||
| 127 | + } | ||
| 128 | + | ||
| 129 | + static GpsComp comp = new GpsComp(); | ||
| 130 | + | ||
| 131 | + public class SignalHandleThread implements Runnable { | ||
| 132 | + | ||
| 133 | + List<GpsEntity> list; | ||
| 134 | + CountDownLatch count; | ||
| 135 | + | ||
| 136 | + SignalHandleThread(List<GpsEntity> gpsList, CountDownLatch count) { | ||
| 137 | + this.list = gpsList; | ||
| 138 | + this.count = count; | ||
| 139 | + } | ||
| 140 | + | ||
| 141 | + @Override | ||
| 142 | + public void run() { | ||
| 143 | + long start = System.currentTimeMillis(); | ||
| 144 | + try { | ||
| 145 | + Collections.sort(list, comp); | ||
| 146 | + GpsEntity gps; | ||
| 147 | + for(int i = 0,len = list.size(); i< len ;i ++){ | ||
| 148 | + if (Thread.currentThread().isInterrupted()) break; | ||
| 149 | + gps = list.get(i); | ||
| 150 | + | ||
| 151 | + try { | ||
| 152 | + if (StringUtils.isEmpty(gps.getNbbm())) | ||
| 153 | + continue; | ||
| 154 | + if (Math.abs(gps.getTimestamp() - gps.getServerTimestamp()) > 1000 * 60 * 20) | ||
| 155 | + continue; | ||
| 156 | + | ||
| 157 | + gpsStateProcess.process(gps);//状态处理 | ||
| 158 | + stationInsideProcess.process(gps);//场站内外判定 | ||
| 159 | + reverseRouteProcess.process(gps);//反向路由处理 | ||
| 160 | + abnormalStateProcess.process(gps);//超速越界 | ||
| 161 | + | ||
| 162 | + inStationProcess.process(gps);//进站 | ||
| 163 | + outStationProcess.process(gps);//出站 | ||
| 164 | + | ||
| 165 | + | ||
| 166 | + GpsCacheData.putGps(gps);//历史gps缓存 | ||
| 167 | + } catch (Throwable e) { | ||
| 168 | + logger.error("SignalHandleThread.run1", e); | ||
| 169 | + } | ||
| 170 | + } | ||
| 171 | + } catch (Exception e) { | ||
| 172 | + logger.error("SignalHandleThread.run2", e); | ||
| 173 | + } finally { | ||
| 174 | + if (count != null) | ||
| 175 | + count.countDown(); | ||
| 176 | + | ||
| 177 | + StringBuilder sb = new StringBuilder(); | ||
| 178 | + sb.append("list size:").append(list.size()).append(" cost:").append(System.currentTimeMillis() - start); | ||
| 179 | + logger.info(sb.toString()); | ||
| 180 | + } | ||
| 181 | + } | ||
| 182 | + } | ||
| 183 | + | ||
| 184 | + public static class GpsComp implements Comparator<GpsEntity> { | ||
| 185 | + | ||
| 186 | + @Override | ||
| 187 | + public int compare(GpsEntity g1, GpsEntity g2) { | ||
| 188 | + return g1.getTimestamp().compareTo(g2.getTimestamp()); | ||
| 189 | + } | ||
| 190 | + } | ||
| 213 | } | 191 | } |
| 214 | \ No newline at end of file | 192 | \ No newline at end of file |
src/main/java/com/bsth/data/gpsdata_v2/cache/GpsCacheData.java
| 1 | -package com.bsth.data.gpsdata_v2.cache; | ||
| 2 | - | ||
| 3 | -import com.bsth.data.gpsdata_v2.entity.GpsEntity; | ||
| 4 | -import com.bsth.data.gpsdata_v2.entity.StationRoute; | ||
| 5 | -import com.bsth.data.gpsdata_v2.entity.trail.GpsExecTrail; | ||
| 6 | -import com.bsth.data.gpsdata_v2.utils.CircleQueue; | ||
| 7 | -import com.google.common.collect.ArrayListMultimap; | ||
| 8 | -import org.slf4j.Logger; | ||
| 9 | -import org.slf4j.LoggerFactory; | ||
| 10 | - | ||
| 11 | -import java.util.*; | ||
| 12 | -import java.util.concurrent.ConcurrentHashMap; | ||
| 13 | -import java.util.concurrent.ConcurrentMap; | ||
| 14 | - | ||
| 15 | -/** | ||
| 16 | - * gps 数据缓存 | ||
| 17 | - * Created by panzhao on 2017/11/15. | ||
| 18 | - */ | ||
| 19 | -public class GpsCacheData { | ||
| 20 | - | ||
| 21 | - /** | ||
| 22 | - * 每辆车缓存最后200条gps | ||
| 23 | - */ | ||
| 24 | - private static final int CACHE_SIZE = 200; | ||
| 25 | - private static ConcurrentMap<String, CircleQueue<GpsEntity>> gpsCacheMap = new ConcurrentHashMap<>(); | ||
| 26 | - | ||
| 27 | - /** | ||
| 28 | - * 车辆执行班次的详细 进出站数据 | ||
| 29 | - */ | ||
| 30 | - private static ArrayListMultimap<String, GpsExecTrail> trailListMultimap = ArrayListMultimap.create(); | ||
| 31 | - | ||
| 32 | - static Logger logger = LoggerFactory.getLogger(GpsCacheData.class); | ||
| 33 | - | ||
| 34 | - public static CircleQueue<GpsEntity> getGps(String device) { | ||
| 35 | - return gpsCacheMap.get(device); | ||
| 36 | - } | ||
| 37 | - | ||
| 38 | - /** | ||
| 39 | - * 清除车辆的轨迹数据 | ||
| 40 | - * @param nbbm | ||
| 41 | - */ | ||
| 42 | - public static void remove(String nbbm){ | ||
| 43 | - //logger.info("清除车辆到离站轨迹, " + nbbm); | ||
| 44 | - trailListMultimap.removeAll(nbbm); | ||
| 45 | - } | ||
| 46 | - | ||
| 47 | - public static GpsExecTrail gpsExecTrail(String nbbm){ | ||
| 48 | - List<GpsExecTrail> list = trailListMultimap.get(nbbm); | ||
| 49 | - | ||
| 50 | - GpsExecTrail trail; | ||
| 51 | - if(null == list || list.size() == 0 | ||
| 52 | - || list.get(list.size() - 1).isEnd()){ | ||
| 53 | - trail = new GpsExecTrail(); | ||
| 54 | - | ||
| 55 | - trailListMultimap.put(nbbm, trail); | ||
| 56 | - } | ||
| 57 | - else{ | ||
| 58 | - trail = list.get(list.size() - 1); | ||
| 59 | - } | ||
| 60 | - | ||
| 61 | - return trail; | ||
| 62 | - } | ||
| 63 | - | ||
| 64 | - public static void out(GpsEntity gps){ | ||
| 65 | - GpsExecTrail trail = gpsExecTrail(gps.getNbbm()); | ||
| 66 | - trail.getSrs().add(gps); | ||
| 67 | - | ||
| 68 | - } | ||
| 69 | - | ||
| 70 | - public static void in(GpsEntity gps, boolean end){ | ||
| 71 | - GpsExecTrail trail = gpsExecTrail(gps.getNbbm()); | ||
| 72 | - trail.getSrs().add(gps); | ||
| 73 | - trail.setEnd(end); | ||
| 74 | - } | ||
| 75 | - | ||
| 76 | - public static GpsEntity getPrev(GpsEntity gps){ | ||
| 77 | - CircleQueue<GpsEntity> queue = gpsCacheMap.get(gps.getDeviceId()); | ||
| 78 | - if(queue != null) | ||
| 79 | - return queue.getTail(); | ||
| 80 | - return null; | ||
| 81 | - } | ||
| 82 | - | ||
| 83 | - public static void putGps(GpsEntity gps) { | ||
| 84 | - CircleQueue<GpsEntity> queue = gpsCacheMap.get(gps.getDeviceId()); | ||
| 85 | - if (queue == null) { | ||
| 86 | - queue = new CircleQueue<>(CACHE_SIZE); | ||
| 87 | - gpsCacheMap.put(gps.getDeviceId(), queue); | ||
| 88 | - } | ||
| 89 | - queue.add(gps); | ||
| 90 | - } | ||
| 91 | - | ||
| 92 | - public static void clear(String deviceId) { | ||
| 93 | - try { | ||
| 94 | - CircleQueue<GpsEntity> queue = gpsCacheMap.get(deviceId); | ||
| 95 | - if (queue != null) | ||
| 96 | - queue.clear(); | ||
| 97 | - } catch (Exception e) { | ||
| 98 | - logger.error("", e); | ||
| 99 | - } | ||
| 100 | - } | ||
| 101 | - | ||
| 102 | - public static StationRoute prevStation(GpsEntity gps) { | ||
| 103 | - List<GpsExecTrail> trails = trailListMultimap.get(gps.getNbbm()); | ||
| 104 | - if(null == trails || trails.size() == 0) | ||
| 105 | - return null; | ||
| 106 | - | ||
| 107 | - GpsEntity prev; | ||
| 108 | - for(int i = trails.size() - 1; i > 0; i--){ | ||
| 109 | - prev = trails.get(i).getSrs().peekLast(); | ||
| 110 | - | ||
| 111 | - if(prev != null){ | ||
| 112 | - return GeoCacheData.getRouteCode(prev); | ||
| 113 | - } | ||
| 114 | - } | ||
| 115 | - return null; | ||
| 116 | - } | ||
| 117 | - | ||
| 118 | - /** | ||
| 119 | - * 最后一段轨迹 | ||
| 120 | - * @param gps | ||
| 121 | - * @return | ||
| 122 | - */ | ||
| 123 | - public static int lastInTrailsSize(GpsEntity gps) { | ||
| 124 | - //int size = 0; | ||
| 125 | - List<GpsExecTrail> trails = trailListMultimap.get(gps.getNbbm()); | ||
| 126 | - if(null == trails || trails.size() == 0) | ||
| 127 | - return 0; | ||
| 128 | - | ||
| 129 | - GpsExecTrail gs = trails.get(trails.size() - 1); | ||
| 130 | - if(gs.isEnd()) | ||
| 131 | - return 0; | ||
| 132 | - | ||
| 133 | - Set<String> set = new HashSet<>(); | ||
| 134 | - for(GpsEntity g : gs.getSrs()){ | ||
| 135 | - if(g.getInstation() == 1) | ||
| 136 | - set.add(g.getStation().getName()); | ||
| 137 | - } | ||
| 138 | - return set.size(); | ||
| 139 | - } | ||
| 140 | - | ||
| 141 | - public static List<StationRoute> prevMultiStation(GpsEntity gps) { | ||
| 142 | - List<StationRoute> rs = new ArrayList<>(); | ||
| 143 | - List<GpsExecTrail> trails = trailListMultimap.get(gps.getNbbm()); | ||
| 144 | - if(null == trails || trails.size() == 0) | ||
| 145 | - return null; | ||
| 146 | - | ||
| 147 | - for(int i = trails.size() - 1; i > 0; i--){ | ||
| 148 | - rs.addAll(searchLinked(trails.get(i).getSrs(), gps.getUpDown())); | ||
| 149 | - if(rs.size() > 3) | ||
| 150 | - break; | ||
| 151 | - } | ||
| 152 | - return rs; | ||
| 153 | - } | ||
| 154 | - | ||
| 155 | - private static List<StationRoute> searchLinked(LinkedList<GpsEntity> list, int upDown){ | ||
| 156 | - List<StationRoute> rs = new ArrayList<>(); | ||
| 157 | - | ||
| 158 | - GpsEntity gps; | ||
| 159 | - int prevCode=0; | ||
| 160 | - for(int i = list.size() - 1; i > 0; i --){ | ||
| 161 | - gps = list.get(i); | ||
| 162 | - if(gps.getInstation()!=1) | ||
| 163 | - continue; | ||
| 164 | - | ||
| 165 | - if(gps.getUpDown() != upDown) | ||
| 166 | - break; | ||
| 167 | - | ||
| 168 | - if(gps.getStation().getRouteSort() != prevCode) | ||
| 169 | - rs.add(gps.getStation()); | ||
| 170 | - | ||
| 171 | - prevCode = gps.getStation().getRouteSort(); | ||
| 172 | - | ||
| 173 | - if(rs.size() >= 3) | ||
| 174 | - break; | ||
| 175 | - } | ||
| 176 | - return rs; | ||
| 177 | - } | ||
| 178 | -} | 1 | +package com.bsth.data.gpsdata_v2.cache; |
| 2 | + | ||
| 3 | +import com.bsth.data.gpsdata_v2.entity.GpsEntity; | ||
| 4 | +import com.bsth.data.gpsdata_v2.entity.StationRoute; | ||
| 5 | +import com.bsth.data.gpsdata_v2.entity.trail.GpsExecTrail; | ||
| 6 | +import com.bsth.data.gpsdata_v2.utils.CircleQueue; | ||
| 7 | +import com.google.common.collect.ArrayListMultimap; | ||
| 8 | +import org.slf4j.Logger; | ||
| 9 | +import org.slf4j.LoggerFactory; | ||
| 10 | + | ||
| 11 | +import java.util.*; | ||
| 12 | +import java.util.concurrent.ConcurrentHashMap; | ||
| 13 | +import java.util.concurrent.ConcurrentMap; | ||
| 14 | +import java.util.concurrent.CopyOnWriteArrayList; | ||
| 15 | + | ||
| 16 | +/** | ||
| 17 | + * gps 数据缓存 | ||
| 18 | + * Created by panzhao on 2017/11/15. | ||
| 19 | + */ | ||
| 20 | +public class GpsCacheData { | ||
| 21 | + | ||
| 22 | + /** | ||
| 23 | + * 每辆车缓存最后200条gps | ||
| 24 | + */ | ||
| 25 | + private static final int CACHE_SIZE = 200; | ||
| 26 | + private static ConcurrentMap<String, CircleQueue<GpsEntity>> gpsCacheMap = new ConcurrentHashMap<>(); | ||
| 27 | + | ||
| 28 | + /** | ||
| 29 | + * 车辆执行班次的详细 进出站数据 | ||
| 30 | + */ | ||
| 31 | + //private static ArrayListMultimap<String, GpsExecTrail> trailListMultimap = ArrayListMultimap.create(); | ||
| 32 | + private static Map<String, List<GpsExecTrail>> trailListMultimap = new ConcurrentHashMap<>(); | ||
| 33 | + | ||
| 34 | + static Logger logger = LoggerFactory.getLogger(GpsCacheData.class); | ||
| 35 | + | ||
| 36 | + public static CircleQueue<GpsEntity> getGps(String device) { | ||
| 37 | + return gpsCacheMap.get(device); | ||
| 38 | + } | ||
| 39 | + | ||
| 40 | + /** | ||
| 41 | + * 清除车辆的轨迹数据 | ||
| 42 | + * @param nbbm | ||
| 43 | + */ | ||
| 44 | + public static void remove(String nbbm){ | ||
| 45 | + //logger.info("清除车辆到离站轨迹, " + nbbm); | ||
| 46 | + trailListMultimap.remove(nbbm); | ||
| 47 | + } | ||
| 48 | + | ||
| 49 | + public static GpsExecTrail gpsExecTrail(String nbbm){ | ||
| 50 | + List<GpsExecTrail> list = trailListMultimap.get(nbbm); | ||
| 51 | + | ||
| 52 | + GpsExecTrail trail; | ||
| 53 | + if (null == list || list.size() == 0 || list.get(list.size() - 1).isEnd()) { | ||
| 54 | + if (null == list) { | ||
| 55 | + list = new CopyOnWriteArrayList<>(); | ||
| 56 | + trailListMultimap.put(nbbm, list); | ||
| 57 | + } | ||
| 58 | + trail = new GpsExecTrail(); | ||
| 59 | + list.add(trail); | ||
| 60 | + } else{ | ||
| 61 | + trail = list.get(list.size() - 1); | ||
| 62 | + } | ||
| 63 | + | ||
| 64 | + return trail; | ||
| 65 | + } | ||
| 66 | + | ||
| 67 | + public static void out(GpsEntity gps){ | ||
| 68 | + GpsExecTrail trail = gpsExecTrail(gps.getNbbm()); | ||
| 69 | + trail.getSrs().add(gps); | ||
| 70 | + | ||
| 71 | + } | ||
| 72 | + | ||
| 73 | + public static void in(GpsEntity gps, boolean end){ | ||
| 74 | + GpsExecTrail trail = gpsExecTrail(gps.getNbbm()); | ||
| 75 | + trail.getSrs().add(gps); | ||
| 76 | + trail.setEnd(end); | ||
| 77 | + } | ||
| 78 | + | ||
| 79 | + public static GpsEntity getPrev(GpsEntity gps){ | ||
| 80 | + CircleQueue<GpsEntity> queue = gpsCacheMap.get(gps.getDeviceId()); | ||
| 81 | + if(queue != null) | ||
| 82 | + return queue.getTail(); | ||
| 83 | + return null; | ||
| 84 | + } | ||
| 85 | + | ||
| 86 | + public static void putGps(GpsEntity gps) { | ||
| 87 | + CircleQueue<GpsEntity> queue = gpsCacheMap.get(gps.getDeviceId()); | ||
| 88 | + if (queue == null) { | ||
| 89 | + queue = new CircleQueue<>(CACHE_SIZE); | ||
| 90 | + gpsCacheMap.put(gps.getDeviceId(), queue); | ||
| 91 | + } | ||
| 92 | + queue.add(gps); | ||
| 93 | + } | ||
| 94 | + | ||
| 95 | + public static void clear(String deviceId) { | ||
| 96 | + try { | ||
| 97 | + CircleQueue<GpsEntity> queue = gpsCacheMap.get(deviceId); | ||
| 98 | + if (queue != null) | ||
| 99 | + queue.clear(); | ||
| 100 | + } catch (Exception e) { | ||
| 101 | + logger.error("", e); | ||
| 102 | + } | ||
| 103 | + } | ||
| 104 | + | ||
| 105 | + public static StationRoute prevStation(GpsEntity gps) { | ||
| 106 | + List<GpsExecTrail> trails = trailListMultimap.get(gps.getNbbm()); | ||
| 107 | + if(null == trails || trails.size() == 0) | ||
| 108 | + return null; | ||
| 109 | + | ||
| 110 | + GpsEntity prev; | ||
| 111 | + for(int i = trails.size() - 1; i > 0; i--){ | ||
| 112 | + prev = trails.get(i).getSrs().peekLast(); | ||
| 113 | + | ||
| 114 | + if(prev != null){ | ||
| 115 | + return GeoCacheData.getRouteCode(prev); | ||
| 116 | + } | ||
| 117 | + } | ||
| 118 | + return null; | ||
| 119 | + } | ||
| 120 | + | ||
| 121 | + /** | ||
| 122 | + * 最后一段轨迹 | ||
| 123 | + * @param gps | ||
| 124 | + * @return | ||
| 125 | + */ | ||
| 126 | + public static int lastInTrailsSize(GpsEntity gps) { | ||
| 127 | + //int size = 0; | ||
| 128 | + List<GpsExecTrail> trails = trailListMultimap.get(gps.getNbbm()); | ||
| 129 | + if(null == trails || trails.size() == 0) | ||
| 130 | + return 0; | ||
| 131 | + | ||
| 132 | + GpsExecTrail gs = trails.get(trails.size() - 1); | ||
| 133 | + if(gs.isEnd()) | ||
| 134 | + return 0; | ||
| 135 | + | ||
| 136 | + Set<String> set = new HashSet<>(); | ||
| 137 | + for(GpsEntity g : gs.getSrs()){ | ||
| 138 | + if(g.getInstation() == 1) | ||
| 139 | + set.add(g.getStation().getName()); | ||
| 140 | + } | ||
| 141 | + return set.size(); | ||
| 142 | + } | ||
| 143 | + | ||
| 144 | + public static List<StationRoute> prevMultiStation(GpsEntity gps) { | ||
| 145 | + List<StationRoute> rs = new ArrayList<>(); | ||
| 146 | + List<GpsExecTrail> trails = trailListMultimap.get(gps.getNbbm()); | ||
| 147 | + if(null == trails || trails.size() == 0) | ||
| 148 | + return null; | ||
| 149 | + | ||
| 150 | + for(int i = trails.size() - 1; i > 0; i--){ | ||
| 151 | + rs.addAll(searchLinked(trails.get(i).getSrs(), gps.getUpDown())); | ||
| 152 | + if(rs.size() > 3) | ||
| 153 | + break; | ||
| 154 | + } | ||
| 155 | + return rs; | ||
| 156 | + } | ||
| 157 | + | ||
| 158 | + private static List<StationRoute> searchLinked(LinkedList<GpsEntity> list, int upDown){ | ||
| 159 | + List<StationRoute> rs = new ArrayList<>(); | ||
| 160 | + | ||
| 161 | + GpsEntity gps; | ||
| 162 | + int prevCode=0; | ||
| 163 | + for(int i = list.size() - 1; i > 0; i --){ | ||
| 164 | + gps = list.get(i); | ||
| 165 | + if(gps.getInstation()!=1) | ||
| 166 | + continue; | ||
| 167 | + | ||
| 168 | + if(gps.getUpDown() != upDown) | ||
| 169 | + break; | ||
| 170 | + | ||
| 171 | + if(gps.getStation().getRouteSort() != prevCode) | ||
| 172 | + rs.add(gps.getStation()); | ||
| 173 | + | ||
| 174 | + prevCode = gps.getStation().getRouteSort(); | ||
| 175 | + | ||
| 176 | + if(rs.size() >= 3) | ||
| 177 | + break; | ||
| 178 | + } | ||
| 179 | + return rs; | ||
| 180 | + } | ||
| 181 | +} |