Commit 929bfdaef8786fc537928d12f79797bf4ea7c7a8
1 parent
10728c1b
1.GpsCacheData中HashMap替换成并发Map
Showing
2 changed files
with
393 additions
and
331 deletions
src/main/java/com/bsth/data/gpsdata_v2/DataHandleProcess.java
| 1 | -package com.bsth.data.gpsdata_v2; | |
| 2 | - | |
| 3 | -import com.alibaba.fastjson.JSON; | |
| 4 | -import com.bsth.data.gpsdata_v2.cache.GpsCacheData; | |
| 5 | -import com.bsth.data.gpsdata_v2.entity.GpsEntity; | |
| 6 | -import com.bsth.data.gpsdata_v2.handlers.*; | |
| 7 | -import com.google.common.collect.ArrayListMultimap; | |
| 8 | -import org.apache.commons.lang3.StringUtils; | |
| 9 | -import org.slf4j.Logger; | |
| 10 | -import org.slf4j.LoggerFactory; | |
| 11 | -import org.springframework.beans.factory.annotation.Autowired; | |
| 12 | -import org.springframework.stereotype.Component; | |
| 13 | - | |
| 14 | -import java.util.*; | |
| 15 | -import java.util.concurrent.CountDownLatch; | |
| 16 | -import java.util.concurrent.ExecutorService; | |
| 17 | -import java.util.concurrent.Executors; | |
| 18 | - | |
| 19 | -/** | |
| 20 | - * 实时信号数据处理 | |
| 21 | - * Created by panzhao on 2017/11/15. | |
| 22 | - */ | |
| 23 | -@Component | |
| 24 | -public class DataHandleProcess { | |
| 25 | - | |
| 26 | - @Autowired | |
| 27 | - GpsStateProcess gpsStateProcess; | |
| 28 | - @Autowired | |
| 29 | - StationInsideProcess stationInsideProcess; | |
| 30 | - @Autowired | |
| 31 | - AbnormalStateProcess abnormalStateProcess; | |
| 32 | - @Autowired | |
| 33 | - InStationProcess inStationProcess; | |
| 34 | - @Autowired | |
| 35 | - OutStationProcess outStationProcess; | |
| 36 | - @Autowired | |
| 37 | - ReverseRouteProcess reverseRouteProcess; | |
| 38 | - @Autowired | |
| 39 | - GpsRealData gpsRealData; | |
| 40 | - | |
| 41 | - | |
| 42 | - static Logger logger = LoggerFactory.getLogger(DataHandleProcess.class); | |
| 43 | - | |
| 44 | - final static int POOL_SIZE = 9; | |
| 45 | - | |
| 46 | - static ExecutorService threadPool = Executors.newFixedThreadPool(POOL_SIZE + 1); | |
| 47 | - public static CountDownLatch count; | |
| 48 | - | |
| 49 | - static long lastTime; | |
| 50 | - | |
| 51 | - public static boolean isBlock() { | |
| 52 | - return System.currentTimeMillis() - lastTime > 1000 * 30; | |
| 53 | - } | |
| 54 | - | |
| 55 | - public void handle(List<GpsEntity> list) { | |
| 56 | - try { | |
| 57 | - if (list.size() == 0) | |
| 58 | - return; | |
| 59 | - lastTime = System.currentTimeMillis(); | |
| 60 | - //按设备号分组数据(一个设备的多条数据,必须在一个线程里跑) | |
| 61 | - ArrayListMultimap multimap = ArrayListMultimap.create(); | |
| 62 | - for (GpsEntity gps : list) { | |
| 63 | - multimap.put(gps.getDeviceId(), gps); | |
| 64 | - } | |
| 65 | - List<String> deviceList = new ArrayList<>(multimap.keySet()); | |
| 66 | - | |
| 67 | - //数据均分给线程 | |
| 68 | - ArrayListMultimap dataListMap = ArrayListMultimap.create(); | |
| 69 | - int size = deviceList.size(), threadIndex = 0, threadSize = size / POOL_SIZE; | |
| 70 | - if(threadSize==0) | |
| 71 | - threadSize = size; | |
| 72 | - for (int i = 0; i < size; i++) { | |
| 73 | - if (i % threadSize == 0) | |
| 74 | - threadIndex++; | |
| 75 | - dataListMap.putAll(threadIndex, multimap.get(deviceList.get(i))); | |
| 76 | - } | |
| 77 | - Set<Integer> ks = dataListMap.keySet(); | |
| 78 | - logger.info("analyse gps size: " + list.size() + ", ks: " + ks.size()); | |
| 79 | - count = new CountDownLatch(ks.size()); | |
| 80 | - | |
| 81 | - logger.info(JSON.toJSONString(ks)); | |
| 82 | - for (Integer index : ks) { | |
| 83 | - threadPool.execute(new SignalHandleThread(dataListMap.get(index), count)); | |
| 84 | - } | |
| 85 | - | |
| 86 | - | |
| 87 | - //等待子线程结束 | |
| 88 | - count.await(); | |
| 89 | - | |
| 90 | - //加入实时gps对照 | |
| 91 | - for (GpsEntity gps : list) | |
| 92 | - gpsRealData.put(gps); | |
| 93 | - | |
| 94 | - logger.info("time , " + (System.currentTimeMillis() - lastTime)); | |
| 95 | - } catch (Exception e) { | |
| 96 | - logger.error("", e); | |
| 97 | - } | |
| 98 | - } | |
| 99 | - | |
| 100 | - static GpsComp comp = new GpsComp(); | |
| 101 | - | |
| 102 | - public class SignalHandleThread implements Runnable { | |
| 103 | - | |
| 104 | - List<GpsEntity> list; | |
| 105 | - CountDownLatch count; | |
| 106 | - | |
| 107 | - SignalHandleThread(List<GpsEntity> gpsList, CountDownLatch count) { | |
| 108 | - this.list = gpsList; | |
| 109 | - this.count = count; | |
| 110 | - } | |
| 111 | - | |
| 112 | - @Override | |
| 113 | - public void run() { | |
| 114 | - try { | |
| 115 | - Collections.sort(list, comp); | |
| 116 | - GpsEntity gps; | |
| 117 | - for(int i = 0,len = list.size(); i< len ;i ++){ | |
| 118 | - gps = list.get(i); | |
| 119 | - | |
| 120 | - try { | |
| 121 | - if (StringUtils.isEmpty(gps.getNbbm())) | |
| 122 | - continue; | |
| 123 | - if (Math.abs(gps.getTimestamp() - gps.getServerTimestamp()) > 1000 * 60 * 20) | |
| 124 | - continue; | |
| 125 | - | |
| 126 | - gpsStateProcess.process(gps);//状态处理 | |
| 127 | - stationInsideProcess.process(gps);//场站内外判定 | |
| 128 | - reverseRouteProcess.process(gps);//反向路由处理 | |
| 129 | - abnormalStateProcess.process(gps);//超速越界 | |
| 130 | - | |
| 131 | - inStationProcess.process(gps);//进站 | |
| 132 | - outStationProcess.process(gps);//出站 | |
| 133 | - | |
| 134 | - | |
| 135 | - GpsCacheData.putGps(gps);//历史gps缓存 | |
| 136 | - } catch (Throwable e) { | |
| 137 | - logger.error("", e); | |
| 138 | - } | |
| 139 | - } | |
| 140 | - } finally { | |
| 141 | - if (count != null) | |
| 142 | - count.countDown(); | |
| 143 | - } | |
| 144 | - } | |
| 145 | - } | |
| 146 | - | |
| 147 | - public static class GpsComp implements Comparator<GpsEntity> { | |
| 148 | - | |
| 149 | - @Override | |
| 150 | - public int compare(GpsEntity g1, GpsEntity g2) { | |
| 151 | - return g1.getTimestamp().compareTo(g2.getTimestamp()); | |
| 152 | - } | |
| 153 | - } | |
| 1 | +package com.bsth.data.gpsdata_v2; | |
| 2 | + | |
| 3 | +import com.alibaba.fastjson.JSON; | |
| 4 | +import com.bsth.data.gpsdata_v2.cache.GpsCacheData; | |
| 5 | +import com.bsth.data.gpsdata_v2.entity.GpsEntity; | |
| 6 | +import com.bsth.data.gpsdata_v2.handlers.*; | |
| 7 | +import com.bsth.email.SendEmailController; | |
| 8 | +import com.bsth.email.entity.EmailBean; | |
| 9 | +import com.google.common.collect.ArrayListMultimap; | |
| 10 | +import org.apache.commons.lang3.StringUtils; | |
| 11 | +import org.slf4j.Logger; | |
| 12 | +import org.slf4j.LoggerFactory; | |
| 13 | +import org.springframework.beans.factory.annotation.Autowired; | |
| 14 | +import org.springframework.stereotype.Component; | |
| 15 | + | |
| 16 | +import java.util.*; | |
| 17 | +import java.util.concurrent.CountDownLatch; | |
| 18 | +import java.util.concurrent.ExecutorService; | |
| 19 | +import java.util.concurrent.Executors; | |
| 20 | +import java.util.concurrent.ThreadFactory; | |
| 21 | +import java.util.concurrent.TimeUnit; | |
| 22 | + | |
| 23 | +/** | |
| 24 | + * 实时信号数据处理 | |
| 25 | + * Created by panzhao on 2017/11/15. | |
| 26 | + */ | |
| 27 | +@Component | |
| 28 | +public class DataHandleProcess { | |
| 29 | + | |
| 30 | + @Autowired | |
| 31 | + GpsStateProcess gpsStateProcess; | |
| 32 | + @Autowired | |
| 33 | + StationInsideProcess stationInsideProcess; | |
| 34 | + @Autowired | |
| 35 | + AbnormalStateProcess abnormalStateProcess; | |
| 36 | + @Autowired | |
| 37 | + InStationProcess inStationProcess; | |
| 38 | + @Autowired | |
| 39 | + OutStationProcess outStationProcess; | |
| 40 | + @Autowired | |
| 41 | + ReverseRouteProcess reverseRouteProcess; | |
| 42 | + @Autowired | |
| 43 | + GpsRealData gpsRealData; | |
| 44 | + // 发送邮件 | |
| 45 | + @Autowired | |
| 46 | + private SendEmailController sendEmailController; | |
| 47 | + | |
| 48 | + | |
| 49 | + static Logger logger = LoggerFactory.getLogger(DataHandleProcess.class); | |
| 50 | + | |
| 51 | + final static int POOL_SIZE = 20; | |
| 52 | + | |
| 53 | + static ExecutorService threadPool = Executors.newFixedThreadPool(POOL_SIZE + 1, new ThreadFactory() { | |
| 54 | + | |
| 55 | + @Override | |
| 56 | + public Thread newThread(Runnable r) { | |
| 57 | + // TODO Auto-generated method stub | |
| 58 | + Thread t = new Thread(r); | |
| 59 | + t.setName("GPSProcessor"); | |
| 60 | + | |
| 61 | + return t; | |
| 62 | + } | |
| 63 | + | |
| 64 | + }); | |
| 65 | + public static CountDownLatch count; | |
| 66 | + | |
| 67 | + static long lastTime; | |
| 68 | + | |
| 69 | + public static boolean isBlock() { | |
| 70 | + return System.currentTimeMillis() - lastTime > 1000 * 30; | |
| 71 | + } | |
| 72 | + | |
| 73 | + private void shutdownAndAwaitTermination(ExecutorService pool) { | |
| 74 | + pool.shutdown(); | |
| 75 | + try { | |
| 76 | + if (!pool.awaitTermination(500, TimeUnit.MILLISECONDS)) { | |
| 77 | + pool.shutdownNow(); | |
| 78 | + } | |
| 79 | + if (!pool.awaitTermination(500, TimeUnit.MILLISECONDS)) { | |
| 80 | + logger.error("线程池无法正常终止"); | |
| 81 | + } | |
| 82 | + } catch (InterruptedException e) { | |
| 83 | + pool.shutdown(); | |
| 84 | + Thread.currentThread().interrupt(); | |
| 85 | + } | |
| 86 | + } | |
| 87 | + | |
| 88 | + public void handle(List<GpsEntity> list) { | |
| 89 | + try { | |
| 90 | + if (list.size() == 0) | |
| 91 | + return; | |
| 92 | + lastTime = System.currentTimeMillis(); | |
| 93 | + //按设备号分组数据(一个设备的多条数据,必须在一个线程里跑) | |
| 94 | + ArrayListMultimap multimap = ArrayListMultimap.create(); | |
| 95 | + for (GpsEntity gps : list) { | |
| 96 | + multimap.put(gps.getDeviceId(), gps); | |
| 97 | + } | |
| 98 | + List<String> deviceList = new ArrayList<>(multimap.keySet()); | |
| 99 | + | |
| 100 | + //数据均分给线程 | |
| 101 | + ArrayListMultimap dataListMap = ArrayListMultimap.create(); | |
| 102 | + int size = deviceList.size(), threadIndex = 0, threadSize = size / POOL_SIZE; | |
| 103 | + if(threadSize==0) | |
| 104 | + threadSize = size; | |
| 105 | + for (int i = 0; i < size; i++) { | |
| 106 | + if (i % threadSize == 0) | |
| 107 | + threadIndex++; | |
| 108 | + dataListMap.putAll(threadIndex, multimap.get(deviceList.get(i))); | |
| 109 | + } | |
| 110 | + Set<Integer> ks = dataListMap.keySet(); | |
| 111 | + logger.info("analyse gps size: " + list.size() + ", ks: " + ks.size()); | |
| 112 | + count = new CountDownLatch(ks.size()); | |
| 113 | + | |
| 114 | + logger.info(JSON.toJSONString(ks)); | |
| 115 | + for (Integer index : ks) { | |
| 116 | + threadPool.execute(new SignalHandleThread(dataListMap.get(index), count)); | |
| 117 | + } | |
| 118 | + | |
| 119 | + | |
| 120 | + //等待子线程结束 | |
| 121 | + boolean isNormal = count.await(5, TimeUnit.SECONDS); | |
| 122 | + if (!isNormal) { | |
| 123 | + try { | |
| 124 | + shutdownAndAwaitTermination(threadPool); | |
| 125 | + threadPool = Executors.newFixedThreadPool(POOL_SIZE + 1, new ThreadFactory() { | |
| 126 | + | |
| 127 | + @Override | |
| 128 | + public Thread newThread(Runnable r) { | |
| 129 | + // TODO Auto-generated method stub | |
| 130 | + Thread t = new Thread(r); | |
| 131 | + t.setName("GPSProcessor"); | |
| 132 | + | |
| 133 | + return t; | |
| 134 | + } | |
| 135 | + | |
| 136 | + }); | |
| 137 | + //发送邮件 | |
| 138 | + EmailBean mail = new EmailBean(); | |
| 139 | + mail.setSubject("线调GPS处理"); | |
| 140 | + mail.setContent("GPS处理超时,检查线程栈文件信息<br/>"); | |
| 141 | + sendEmailController.sendMail("113252620@qq.com", mail); | |
| 142 | + logger.info("DataHandlerProcess:邮件发送成功!"); | |
| 143 | + } catch (Exception e){ | |
| 144 | + logger.error("DataHandlerProcess:邮件发送失败!",e); | |
| 145 | + } | |
| 146 | + } | |
| 147 | + | |
| 148 | + //加入实时gps对照 | |
| 149 | + for (GpsEntity gps : list) | |
| 150 | + gpsRealData.put(gps); | |
| 151 | + | |
| 152 | + logger.info("time , " + (System.currentTimeMillis() - lastTime)); | |
| 153 | + } catch (Exception e) { | |
| 154 | + logger.error("", e); | |
| 155 | + } | |
| 156 | + } | |
| 157 | + | |
| 158 | + static GpsComp comp = new GpsComp(); | |
| 159 | + | |
| 160 | + public class SignalHandleThread implements Runnable { | |
| 161 | + | |
| 162 | + List<GpsEntity> list; | |
| 163 | + CountDownLatch count; | |
| 164 | + | |
| 165 | + SignalHandleThread(List<GpsEntity> gpsList, CountDownLatch count) { | |
| 166 | + this.list = gpsList; | |
| 167 | + this.count = count; | |
| 168 | + } | |
| 169 | + | |
| 170 | + @Override | |
| 171 | + public void run() { | |
| 172 | + try { | |
| 173 | + Collections.sort(list, comp); | |
| 174 | + GpsEntity gps; | |
| 175 | + for(int i = 0,len = list.size(); i< len ;i ++){ | |
| 176 | + if (Thread.currentThread().isInterrupted()) break; | |
| 177 | + gps = list.get(i); | |
| 178 | + | |
| 179 | + try { | |
| 180 | + if (StringUtils.isEmpty(gps.getNbbm())) | |
| 181 | + continue; | |
| 182 | + if (Math.abs(gps.getTimestamp() - gps.getServerTimestamp()) > 1000 * 60 * 20) | |
| 183 | + continue; | |
| 184 | + | |
| 185 | + gpsStateProcess.process(gps);//状态处理 | |
| 186 | + stationInsideProcess.process(gps);//场站内外判定 | |
| 187 | + reverseRouteProcess.process(gps);//反向路由处理 | |
| 188 | + abnormalStateProcess.process(gps);//超速越界 | |
| 189 | + | |
| 190 | + inStationProcess.process(gps);//进站 | |
| 191 | + outStationProcess.process(gps);//出站 | |
| 192 | + | |
| 193 | + | |
| 194 | + GpsCacheData.putGps(gps);//历史gps缓存 | |
| 195 | + } catch (Throwable e) { | |
| 196 | + logger.error("", e); | |
| 197 | + } | |
| 198 | + } | |
| 199 | + } finally { | |
| 200 | + if (count != null) | |
| 201 | + count.countDown(); | |
| 202 | + } | |
| 203 | + } | |
| 204 | + } | |
| 205 | + | |
| 206 | + public static class GpsComp implements Comparator<GpsEntity> { | |
| 207 | + | |
| 208 | + @Override | |
| 209 | + public int compare(GpsEntity g1, GpsEntity g2) { | |
| 210 | + return g1.getTimestamp().compareTo(g2.getTimestamp()); | |
| 211 | + } | |
| 212 | + } | |
| 154 | 213 | } |
| 155 | 214 | \ No newline at end of file | ... | ... |
src/main/java/com/bsth/data/gpsdata_v2/cache/GpsCacheData.java
| 1 | -package com.bsth.data.gpsdata_v2.cache; | |
| 2 | - | |
| 3 | -import com.bsth.data.gpsdata_v2.entity.GpsEntity; | |
| 4 | -import com.bsth.data.gpsdata_v2.entity.StationRoute; | |
| 5 | -import com.bsth.data.gpsdata_v2.entity.trail.GpsExecTrail; | |
| 6 | -import com.bsth.data.gpsdata_v2.utils.CircleQueue; | |
| 7 | -import com.google.common.collect.ArrayListMultimap; | |
| 8 | -import org.slf4j.Logger; | |
| 9 | -import org.slf4j.LoggerFactory; | |
| 10 | - | |
| 11 | -import java.util.*; | |
| 12 | -import java.util.concurrent.ConcurrentHashMap; | |
| 13 | -import java.util.concurrent.ConcurrentMap; | |
| 14 | - | |
| 15 | -/** | |
| 16 | - * gps 数据缓存 | |
| 17 | - * Created by panzhao on 2017/11/15. | |
| 18 | - */ | |
| 19 | -public class GpsCacheData { | |
| 20 | - | |
| 21 | - /** | |
| 22 | - * 每辆车缓存最后200条gps | |
| 23 | - */ | |
| 24 | - private static final int CACHE_SIZE = 200; | |
| 25 | - private static ConcurrentMap<String, CircleQueue<GpsEntity>> gpsCacheMap = new ConcurrentHashMap<>(); | |
| 26 | - | |
| 27 | - /** | |
| 28 | - * 车辆执行班次的详细 进出站数据 | |
| 29 | - */ | |
| 30 | - private static ArrayListMultimap<String, GpsExecTrail> trailListMultimap = ArrayListMultimap.create(); | |
| 31 | - | |
| 32 | - static Logger logger = LoggerFactory.getLogger(GpsCacheData.class); | |
| 33 | - | |
| 34 | - public static CircleQueue<GpsEntity> getGps(String device) { | |
| 35 | - return gpsCacheMap.get(device); | |
| 36 | - } | |
| 37 | - | |
| 38 | - /** | |
| 39 | - * 清除车辆的轨迹数据 | |
| 40 | - * @param nbbm | |
| 41 | - */ | |
| 42 | - public static void remove(String nbbm){ | |
| 43 | - //logger.info("清除车辆到离站轨迹, " + nbbm); | |
| 44 | - trailListMultimap.removeAll(nbbm); | |
| 45 | - } | |
| 46 | - | |
| 47 | - public static GpsExecTrail gpsExecTrail(String nbbm){ | |
| 48 | - List<GpsExecTrail> list = trailListMultimap.get(nbbm); | |
| 49 | - | |
| 50 | - GpsExecTrail trail; | |
| 51 | - if(null == list || list.size() == 0 | |
| 52 | - || list.get(list.size() - 1).isEnd()){ | |
| 53 | - trail = new GpsExecTrail(); | |
| 54 | - | |
| 55 | - trailListMultimap.put(nbbm, trail); | |
| 56 | - } | |
| 57 | - else{ | |
| 58 | - trail = list.get(list.size() - 1); | |
| 59 | - } | |
| 60 | - | |
| 61 | - return trail; | |
| 62 | - } | |
| 63 | - | |
| 64 | - public static void out(GpsEntity gps){ | |
| 65 | - GpsExecTrail trail = gpsExecTrail(gps.getNbbm()); | |
| 66 | - trail.getSrs().add(gps); | |
| 67 | - | |
| 68 | - } | |
| 69 | - | |
| 70 | - public static void in(GpsEntity gps, boolean end){ | |
| 71 | - GpsExecTrail trail = gpsExecTrail(gps.getNbbm()); | |
| 72 | - trail.getSrs().add(gps); | |
| 73 | - trail.setEnd(end); | |
| 74 | - } | |
| 75 | - | |
| 76 | - public static GpsEntity getPrev(GpsEntity gps){ | |
| 77 | - CircleQueue<GpsEntity> queue = gpsCacheMap.get(gps.getDeviceId()); | |
| 78 | - if(queue != null) | |
| 79 | - return queue.getTail(); | |
| 80 | - return null; | |
| 81 | - } | |
| 82 | - | |
| 83 | - public static void putGps(GpsEntity gps) { | |
| 84 | - CircleQueue<GpsEntity> queue = gpsCacheMap.get(gps.getDeviceId()); | |
| 85 | - if (queue == null) { | |
| 86 | - queue = new CircleQueue<>(CACHE_SIZE); | |
| 87 | - gpsCacheMap.put(gps.getDeviceId(), queue); | |
| 88 | - } | |
| 89 | - queue.add(gps); | |
| 90 | - } | |
| 91 | - | |
| 92 | - public static void clear(String deviceId) { | |
| 93 | - try { | |
| 94 | - CircleQueue<GpsEntity> queue = gpsCacheMap.get(deviceId); | |
| 95 | - if (queue != null) | |
| 96 | - queue.clear(); | |
| 97 | - } catch (Exception e) { | |
| 98 | - logger.error("", e); | |
| 99 | - } | |
| 100 | - } | |
| 101 | - | |
| 102 | - public static StationRoute prevStation(GpsEntity gps) { | |
| 103 | - List<GpsExecTrail> trails = trailListMultimap.get(gps.getNbbm()); | |
| 104 | - if(null == trails || trails.size() == 0) | |
| 105 | - return null; | |
| 106 | - | |
| 107 | - GpsEntity prev; | |
| 108 | - for(int i = trails.size() - 1; i > 0; i--){ | |
| 109 | - prev = trails.get(i).getSrs().peekLast(); | |
| 110 | - | |
| 111 | - if(prev != null){ | |
| 112 | - return GeoCacheData.getRouteCode(prev); | |
| 113 | - } | |
| 114 | - } | |
| 115 | - return null; | |
| 116 | - } | |
| 117 | - | |
| 118 | - /** | |
| 119 | - * 最后一段轨迹 | |
| 120 | - * @param gps | |
| 121 | - * @return | |
| 122 | - */ | |
| 123 | - public static int lastInTrailsSize(GpsEntity gps) { | |
| 124 | - //int size = 0; | |
| 125 | - List<GpsExecTrail> trails = trailListMultimap.get(gps.getNbbm()); | |
| 126 | - if(null == trails || trails.size() == 0) | |
| 127 | - return 0; | |
| 128 | - | |
| 129 | - GpsExecTrail gs = trails.get(trails.size() - 1); | |
| 130 | - if(gs.isEnd()) | |
| 131 | - return 0; | |
| 132 | - | |
| 133 | - Set<String> set = new HashSet<>(); | |
| 134 | - for(GpsEntity g : gs.getSrs()){ | |
| 135 | - if(g.getInstation() == 1) | |
| 136 | - set.add(g.getStation().getName()); | |
| 137 | - } | |
| 138 | - return set.size(); | |
| 139 | - } | |
| 140 | - | |
| 141 | - public static List<StationRoute> prevMultiStation(GpsEntity gps) { | |
| 142 | - List<StationRoute> rs = new ArrayList<>(); | |
| 143 | - List<GpsExecTrail> trails = trailListMultimap.get(gps.getNbbm()); | |
| 144 | - if(null == trails || trails.size() == 0) | |
| 145 | - return null; | |
| 146 | - | |
| 147 | - for(int i = trails.size() - 1; i > 0; i--){ | |
| 148 | - rs.addAll(searchLinked(trails.get(i).getSrs(), gps.getUpDown())); | |
| 149 | - if(rs.size() > 3) | |
| 150 | - break; | |
| 151 | - } | |
| 152 | - return rs; | |
| 153 | - } | |
| 154 | - | |
| 155 | - private static List<StationRoute> searchLinked(LinkedList<GpsEntity> list, int upDown){ | |
| 156 | - List<StationRoute> rs = new ArrayList<>(); | |
| 157 | - | |
| 158 | - GpsEntity gps; | |
| 159 | - int prevCode=0; | |
| 160 | - for(int i = list.size() - 1; i > 0; i --){ | |
| 161 | - gps = list.get(i); | |
| 162 | - if(gps.getInstation()!=1) | |
| 163 | - continue; | |
| 164 | - | |
| 165 | - if(gps.getUpDown() != upDown) | |
| 166 | - break; | |
| 167 | - | |
| 168 | - if(gps.getStation().getRouteSort() != prevCode) | |
| 169 | - rs.add(gps.getStation()); | |
| 170 | - | |
| 171 | - prevCode = gps.getStation().getRouteSort(); | |
| 172 | - | |
| 173 | - if(rs.size() >= 3) | |
| 174 | - break; | |
| 175 | - } | |
| 176 | - return rs; | |
| 177 | - } | |
| 178 | -} | |
| 1 | +package com.bsth.data.gpsdata_v2.cache; | |
| 2 | + | |
| 3 | +import com.bsth.data.gpsdata_v2.entity.GpsEntity; | |
| 4 | +import com.bsth.data.gpsdata_v2.entity.StationRoute; | |
| 5 | +import com.bsth.data.gpsdata_v2.entity.trail.GpsExecTrail; | |
| 6 | +import com.bsth.data.gpsdata_v2.utils.CircleQueue; | |
| 7 | +import com.google.common.collect.ArrayListMultimap; | |
| 8 | +import org.slf4j.Logger; | |
| 9 | +import org.slf4j.LoggerFactory; | |
| 10 | + | |
| 11 | +import java.util.*; | |
| 12 | +import java.util.concurrent.ConcurrentHashMap; | |
| 13 | +import java.util.concurrent.ConcurrentMap; | |
| 14 | +import java.util.concurrent.CopyOnWriteArrayList; | |
| 15 | + | |
| 16 | +/** | |
| 17 | + * gps 数据缓存 | |
| 18 | + * Created by panzhao on 2017/11/15. | |
| 19 | + */ | |
| 20 | +public class GpsCacheData { | |
| 21 | + | |
| 22 | + /** | |
| 23 | + * 每辆车缓存最后200条gps | |
| 24 | + */ | |
| 25 | + private static final int CACHE_SIZE = 200; | |
| 26 | + private static ConcurrentMap<String, CircleQueue<GpsEntity>> gpsCacheMap = new ConcurrentHashMap<>(); | |
| 27 | + | |
| 28 | + /** | |
| 29 | + * 车辆执行班次的详细 进出站数据 | |
| 30 | + */ | |
| 31 | + //private static ArrayListMultimap<String, GpsExecTrail> trailListMultimap = ArrayListMultimap.create(); | |
| 32 | + private static Map<String, List<GpsExecTrail>> trailListMultimap = new ConcurrentHashMap<>(); | |
| 33 | + | |
| 34 | + static Logger logger = LoggerFactory.getLogger(GpsCacheData.class); | |
| 35 | + | |
| 36 | + public static CircleQueue<GpsEntity> getGps(String device) { | |
| 37 | + return gpsCacheMap.get(device); | |
| 38 | + } | |
| 39 | + | |
| 40 | + /** | |
| 41 | + * 清除车辆的轨迹数据 | |
| 42 | + * @param nbbm | |
| 43 | + */ | |
| 44 | + public static void remove(String nbbm){ | |
| 45 | + //logger.info("清除车辆到离站轨迹, " + nbbm); | |
| 46 | + trailListMultimap.remove(nbbm); | |
| 47 | + } | |
| 48 | + | |
| 49 | + public static GpsExecTrail gpsExecTrail(String nbbm){ | |
| 50 | + List<GpsExecTrail> list = trailListMultimap.get(nbbm); | |
| 51 | + | |
| 52 | + GpsExecTrail trail; | |
| 53 | + if (null == list || list.size() == 0 || list.get(list.size() - 1).isEnd()) { | |
| 54 | + if (null == list) { | |
| 55 | + list = new CopyOnWriteArrayList<>(); | |
| 56 | + trailListMultimap.put(nbbm, list); | |
| 57 | + } | |
| 58 | + trail = new GpsExecTrail(); | |
| 59 | + list.add(trail); | |
| 60 | + } else{ | |
| 61 | + trail = list.get(list.size() - 1); | |
| 62 | + } | |
| 63 | + | |
| 64 | + return trail; | |
| 65 | + } | |
| 66 | + | |
| 67 | + public static void out(GpsEntity gps){ | |
| 68 | + GpsExecTrail trail = gpsExecTrail(gps.getNbbm()); | |
| 69 | + trail.getSrs().add(gps); | |
| 70 | + | |
| 71 | + } | |
| 72 | + | |
| 73 | + public static void in(GpsEntity gps, boolean end){ | |
| 74 | + GpsExecTrail trail = gpsExecTrail(gps.getNbbm()); | |
| 75 | + trail.getSrs().add(gps); | |
| 76 | + trail.setEnd(end); | |
| 77 | + } | |
| 78 | + | |
| 79 | + public static GpsEntity getPrev(GpsEntity gps){ | |
| 80 | + CircleQueue<GpsEntity> queue = gpsCacheMap.get(gps.getDeviceId()); | |
| 81 | + if(queue != null) | |
| 82 | + return queue.getTail(); | |
| 83 | + return null; | |
| 84 | + } | |
| 85 | + | |
| 86 | + public static void putGps(GpsEntity gps) { | |
| 87 | + CircleQueue<GpsEntity> queue = gpsCacheMap.get(gps.getDeviceId()); | |
| 88 | + if (queue == null) { | |
| 89 | + queue = new CircleQueue<>(CACHE_SIZE); | |
| 90 | + gpsCacheMap.put(gps.getDeviceId(), queue); | |
| 91 | + } | |
| 92 | + queue.add(gps); | |
| 93 | + } | |
| 94 | + | |
| 95 | + public static void clear(String deviceId) { | |
| 96 | + try { | |
| 97 | + CircleQueue<GpsEntity> queue = gpsCacheMap.get(deviceId); | |
| 98 | + if (queue != null) | |
| 99 | + queue.clear(); | |
| 100 | + } catch (Exception e) { | |
| 101 | + logger.error("", e); | |
| 102 | + } | |
| 103 | + } | |
| 104 | + | |
| 105 | + public static StationRoute prevStation(GpsEntity gps) { | |
| 106 | + List<GpsExecTrail> trails = trailListMultimap.get(gps.getNbbm()); | |
| 107 | + if(null == trails || trails.size() == 0) | |
| 108 | + return null; | |
| 109 | + | |
| 110 | + GpsEntity prev; | |
| 111 | + for(int i = trails.size() - 1; i > 0; i--){ | |
| 112 | + prev = trails.get(i).getSrs().peekLast(); | |
| 113 | + | |
| 114 | + if(prev != null){ | |
| 115 | + return GeoCacheData.getRouteCode(prev); | |
| 116 | + } | |
| 117 | + } | |
| 118 | + return null; | |
| 119 | + } | |
| 120 | + | |
| 121 | + /** | |
| 122 | + * 最后一段轨迹 | |
| 123 | + * @param gps | |
| 124 | + * @return | |
| 125 | + */ | |
| 126 | + public static int lastInTrailsSize(GpsEntity gps) { | |
| 127 | + //int size = 0; | |
| 128 | + List<GpsExecTrail> trails = trailListMultimap.get(gps.getNbbm()); | |
| 129 | + if(null == trails || trails.size() == 0) | |
| 130 | + return 0; | |
| 131 | + | |
| 132 | + GpsExecTrail gs = trails.get(trails.size() - 1); | |
| 133 | + if(gs.isEnd()) | |
| 134 | + return 0; | |
| 135 | + | |
| 136 | + Set<String> set = new HashSet<>(); | |
| 137 | + for(GpsEntity g : gs.getSrs()){ | |
| 138 | + if(g.getInstation() == 1) | |
| 139 | + set.add(g.getStation().getName()); | |
| 140 | + } | |
| 141 | + return set.size(); | |
| 142 | + } | |
| 143 | + | |
| 144 | + public static List<StationRoute> prevMultiStation(GpsEntity gps) { | |
| 145 | + List<StationRoute> rs = new ArrayList<>(); | |
| 146 | + List<GpsExecTrail> trails = trailListMultimap.get(gps.getNbbm()); | |
| 147 | + if(null == trails || trails.size() == 0) | |
| 148 | + return null; | |
| 149 | + | |
| 150 | + for(int i = trails.size() - 1; i > 0; i--){ | |
| 151 | + rs.addAll(searchLinked(trails.get(i).getSrs(), gps.getUpDown())); | |
| 152 | + if(rs.size() > 3) | |
| 153 | + break; | |
| 154 | + } | |
| 155 | + return rs; | |
| 156 | + } | |
| 157 | + | |
| 158 | + private static List<StationRoute> searchLinked(LinkedList<GpsEntity> list, int upDown){ | |
| 159 | + List<StationRoute> rs = new ArrayList<>(); | |
| 160 | + | |
| 161 | + GpsEntity gps; | |
| 162 | + int prevCode=0; | |
| 163 | + for(int i = list.size() - 1; i > 0; i --){ | |
| 164 | + gps = list.get(i); | |
| 165 | + if(gps.getInstation()!=1) | |
| 166 | + continue; | |
| 167 | + | |
| 168 | + if(gps.getUpDown() != upDown) | |
| 169 | + break; | |
| 170 | + | |
| 171 | + if(gps.getStation().getRouteSort() != prevCode) | |
| 172 | + rs.add(gps.getStation()); | |
| 173 | + | |
| 174 | + prevCode = gps.getStation().getRouteSort(); | |
| 175 | + | |
| 176 | + if(rs.size() >= 3) | |
| 177 | + break; | |
| 178 | + } | |
| 179 | + return rs; | |
| 180 | + } | |
| 181 | +} | ... | ... |