Commit d06bd98b24ec44c4270b60412dfc7f5b7f607575
1 parent
3f23f994
update...
Showing
4 changed files
with
49 additions
and
37 deletions
src/main/java/com/bsth/client/GpsBeforeBuffer.java
| ... | ... | @@ -10,12 +10,8 @@ import org.springframework.core.annotation.Order; |
| 10 | 10 | import org.springframework.stereotype.Component; |
| 11 | 11 | |
| 12 | 12 | import java.util.ArrayList; |
| 13 | -import java.util.LinkedList; | |
| 14 | 13 | import java.util.List; |
| 15 | -import java.util.concurrent.Executors; | |
| 16 | -import java.util.concurrent.ScheduledExecutorService; | |
| 17 | -import java.util.concurrent.ThreadFactory; | |
| 18 | -import java.util.concurrent.TimeUnit; | |
| 14 | +import java.util.concurrent.*; | |
| 19 | 15 | |
| 20 | 16 | /** |
| 21 | 17 | * 从 socket client 到调度系统 的缓冲 |
| ... | ... | @@ -25,14 +21,23 @@ import java.util.concurrent.TimeUnit; |
| 25 | 21 | @Order(2) |
| 26 | 22 | public class GpsBeforeBuffer implements CommandLineRunner{ |
| 27 | 23 | |
| 28 | - static LinkedList<GpsEntity> linkedList = new LinkedList(); | |
| 24 | + static ConcurrentLinkedQueue<GpsEntity> linkedList = new ConcurrentLinkedQueue(); | |
| 29 | 25 | static final int MAX_SIZE = 4000 * 20; |
| 26 | + static int size = 0; | |
| 30 | 27 | |
| 31 | 28 | public void put(GpsEntity gps){ |
| 32 | - if(gps == null || StringUtils.isBlank(gps.getDeviceId())) | |
| 33 | - return; | |
| 29 | + try { | |
| 30 | + if(gps == null) | |
| 31 | + return; | |
| 32 | + | |
| 33 | + if(StringUtils.isBlank(gps.getDeviceId())) | |
| 34 | + return; | |
| 34 | 35 | |
| 35 | - linkedList.add(gps); | |
| 36 | + linkedList.add(gps); | |
| 37 | + size++; | |
| 38 | + }catch (Exception e){ | |
| 39 | + log.error("", e); | |
| 40 | + } | |
| 36 | 41 | } |
| 37 | 42 | |
| 38 | 43 | static Logger log = LoggerFactory.getLogger(GpsBeforeBuffer.class); |
| ... | ... | @@ -40,23 +45,17 @@ public class GpsBeforeBuffer implements CommandLineRunner{ |
| 40 | 45 | public static List<GpsEntity> pollAll(){ |
| 41 | 46 | List<GpsEntity> rs = new ArrayList<>(300); |
| 42 | 47 | GpsEntity gps; |
| 43 | - int size = linkedList.size(); | |
| 44 | - for(int j = 0; j < size; j++){ | |
| 45 | - gps = linkedList.poll(); | |
| 46 | - if(gps == null){ | |
| 47 | - log.error("linkedList poll gps is null ???"); | |
| 48 | - } | |
| 49 | - rs.add(gps); | |
| 50 | - } | |
| 51 | - /*while (true){ | |
| 48 | + | |
| 49 | + while (true){ | |
| 52 | 50 | gps = linkedList.poll(); |
| 53 | 51 | if(gps == null){ |
| 52 | + size = 0; | |
| 54 | 53 | break; |
| 55 | 54 | } |
| 56 | 55 | rs.add(gps); |
| 57 | - }*/ | |
| 56 | + } | |
| 58 | 57 | |
| 59 | - log.info("poll size: " + rs.size()); | |
| 58 | + log.info("poll size: " + rs.size() + " -current size: " + size); | |
| 60 | 59 | return rs; |
| 61 | 60 | } |
| 62 | 61 | |
| ... | ... | @@ -64,33 +63,27 @@ public class GpsBeforeBuffer implements CommandLineRunner{ |
| 64 | 63 | * 清理数据,保持最大 MAX_SIZE 个数的元素 |
| 65 | 64 | */ |
| 66 | 65 | public static void clear(){ |
| 67 | - int size = linkedList.size(); | |
| 68 | 66 | if(size <= MAX_SIZE) |
| 69 | 67 | return; |
| 70 | - | |
| 71 | 68 | int len = size - MAX_SIZE; |
| 72 | 69 | for(int j = 0; j < len; j++){ |
| 73 | 70 | linkedList.poll(); |
| 71 | + size--; | |
| 74 | 72 | } |
| 75 | - log.info("clear size: " + len); | |
| 73 | + log.info("clear size: " + len + " -current size: " + size); | |
| 76 | 74 | } |
| 77 | 75 | |
| 78 | 76 | @Autowired |
| 79 | 77 | BufferSizeCheck bufferSizeCheck; |
| 78 | + @Autowired | |
| 79 | + SizeCheck sizeCheck; | |
| 80 | 80 | |
| 81 | - ScheduledExecutorService sexec; | |
| 81 | + ScheduledExecutorService sexec = Executors.newScheduledThreadPool(2); | |
| 82 | 82 | |
| 83 | 83 | @Override |
| 84 | 84 | public void run(String... strings) throws Exception { |
| 85 | - sexec = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { | |
| 86 | - @Override | |
| 87 | - public Thread newThread(Runnable r) { | |
| 88 | - Thread t = new Thread(r); | |
| 89 | - t.setName("gpsBufferSizeCheck"); | |
| 90 | - return t; | |
| 91 | - } | |
| 92 | - }); | |
| 93 | 85 | sexec.scheduleWithFixedDelay(bufferSizeCheck, 60, 30, TimeUnit.SECONDS); |
| 86 | + sexec.scheduleWithFixedDelay(sizeCheck, 60, 60 * 5, TimeUnit.SECONDS); | |
| 94 | 87 | } |
| 95 | 88 | |
| 96 | 89 | @Component |
| ... | ... | @@ -101,4 +94,13 @@ public class GpsBeforeBuffer implements CommandLineRunner{ |
| 101 | 94 | GpsBeforeBuffer.clear(); |
| 102 | 95 | } |
| 103 | 96 | } |
| 97 | + | |
| 98 | + @Component | |
| 99 | + public static class SizeCheck extends Thread{ | |
| 100 | + | |
| 101 | + @Override | |
| 102 | + public void run() { | |
| 103 | + log.info("[SizeCheck] linkedList real size: " + linkedList.size() + " -current size: " + size); | |
| 104 | + } | |
| 105 | + } | |
| 104 | 106 | } |
| 105 | 107 | \ No newline at end of file | ... | ... |
src/main/java/com/bsth/client/pd/handler/PdClientHandler.java
| ... | ... | @@ -73,11 +73,17 @@ public class PdClientHandler extends IoHandlerAdapter{ |
| 73 | 73 | } |
| 74 | 74 | else if(0x41 == msg.getCommandType()){ |
| 75 | 75 | Pd_41_0 pd41 = (Pd_41_0)msg.getMessageBody(); |
| 76 | - gpsBeforeBuffer.put(GpsEntity.getInstance(pd41.getInfo(), msg.getVersion(), 1)); | |
| 76 | + | |
| 77 | + GpsEntity gps = GpsEntity.getInstance(pd41.getInfo(), msg.getVersion(), 1); | |
| 78 | + if(gps != null) | |
| 79 | + gpsBeforeBuffer.put(gps); | |
| 77 | 80 | } |
| 78 | 81 | else if(0x42 == msg.getCommandType()){ |
| 79 | 82 | Pd_42_0 pd42 = (Pd_42_0)msg.getMessageBody(); |
| 80 | - gpsBeforeBuffer.put(GpsEntity.getInstance(pd42.getInfo(), msg.getVersion(), 1)); | |
| 83 | + | |
| 84 | + GpsEntity gps = GpsEntity.getInstance(pd42.getInfo(), msg.getVersion(), 1); | |
| 85 | + if(gps != null) | |
| 86 | + gpsBeforeBuffer.put(GpsEntity.getInstance(pd42.getInfo(), msg.getVersion(), 1)); | |
| 81 | 87 | } |
| 82 | 88 | } |
| 83 | 89 | } | ... | ... |
src/main/java/com/bsth/client/pf/handler/PfClientHandler.java
| ... | ... | @@ -65,13 +65,16 @@ public class PfClientHandler extends IoHandlerAdapter{ |
| 65 | 65 | PfMessage msg = (PfMessage)message; |
| 66 | 66 | IMessageBody body = msg.getMessageBody(); |
| 67 | 67 | if (body != null) { |
| 68 | - String deviceId = body.getDeviceId(); | |
| 68 | + //String deviceId = body.getDeviceId(); | |
| 69 | 69 | if (0x1 == msg.getCommandType()) { |
| 70 | 70 | log.debug("设备编号:" + body.getDeviceId() + "建立连接"); |
| 71 | 71 | } |
| 72 | 72 | |
| 73 | 73 | BasicInfo info = Protocol2BizUtil.getBasicInfoFromMsg(msg); |
| 74 | - gpsBeforeBuffer.put(GpsEntity.getInstance(info, msg.getVersion(), 0)); | |
| 74 | + | |
| 75 | + GpsEntity gps = GpsEntity.getInstance(info, msg.getVersion(), 0); | |
| 76 | + if(gps != null) | |
| 77 | + gpsBeforeBuffer.put(gps); | |
| 75 | 78 | } |
| 76 | 79 | |
| 77 | 80 | } | ... | ... |
src/main/java/com/bsth/entity/GpsEntity.java
| 1 | 1 | package com.bsth.entity; |
| 2 | 2 | |
| 3 | 3 | import com.bsth.client.pd.protocol.BasicInfo; |
| 4 | +import org.apache.commons.lang3.StringUtils; | |
| 4 | 5 | |
| 5 | 6 | /** |
| 6 | 7 | * @author PanZhao |
| ... | ... | @@ -77,7 +78,7 @@ public class GpsEntity { |
| 77 | 78 | public static GpsEntity getInstance(BasicInfo basicInfo, int version, int source){ |
| 78 | 79 | //放弃补发数据 |
| 79 | 80 | byte cacheData = getCacheState(basicInfo.getServiceState()); |
| 80 | - if(cacheData == 1) | |
| 81 | + if(cacheData == 1 || StringUtils.isBlank(basicInfo.getDeviceId())) | |
| 81 | 82 | return null; |
| 82 | 83 | |
| 83 | 84 | GpsEntity gps = new GpsEntity(); | ... | ... |