Commit 8ed85f4e43aeb3167e1c38bf5150d503dba85f54

Authored by 潘钊
1 parent bed43f38

update...

src/main/java/com/bsth/client/ClientApp.java
... ... @@ -18,6 +18,8 @@ import org.apache.mina.transport.socket.nio.NioSocketConnector;
18 18 import org.slf4j.Logger;
19 19 import org.slf4j.LoggerFactory;
20 20 import org.springframework.beans.factory.annotation.Autowired;
  21 +import org.springframework.boot.CommandLineRunner;
  22 +import org.springframework.core.annotation.Order;
21 23 import org.springframework.stereotype.Component;
22 24  
23 25 import java.net.InetSocketAddress;
... ... @@ -27,7 +29,8 @@ import java.util.concurrent.*;
27 29 * Created by panzhao on 2017/5/4.
28 30 */
29 31 @Component
30   -public class ClientApp {
  32 +@Order(1)
  33 +public class ClientApp implements CommandLineRunner{
31 34  
32 35 private static NioSocketConnector pdDataConnector;
33 36 private static NioSocketConnector pfDataConnector;
... ... @@ -67,7 +70,6 @@ public class ClientApp {
67 70  
68 71 logger.info("dconnect...");
69 72 pdSession = session;
70   - //SessionManager.getInstance().register(device, session);
71 73 } catch (Exception e) {
72 74 e.printStackTrace();
73 75 }
... ... @@ -212,6 +214,12 @@ public class ClientApp {
212 214  
213 215 static IoSession pdSession;
214 216 static IoSession pfSession;
  217 +
  218 + @Override
  219 + public void run(String... strings) throws Exception {
  220 + init();
  221 + }
  222 +
215 223 final class SessionChecker implements Runnable {
216 224  
217 225 @Override
... ...
src/main/java/com/bsth/client/GpsBeforeBuffer.java
1 1 package com.bsth.client;
2 2  
3   -import com.bsth.client.pd.protocol.BasicInfo;
4 3 import com.bsth.entity.GpsEntity;
5 4 import org.apache.commons.lang3.StringUtils;
  5 +import org.slf4j.Logger;
  6 +import org.slf4j.LoggerFactory;
  7 +import org.springframework.beans.factory.annotation.Autowired;
  8 +import org.springframework.boot.CommandLineRunner;
  9 +import org.springframework.core.annotation.Order;
6 10 import org.springframework.stereotype.Component;
7 11  
  12 +import java.util.ArrayList;
8 13 import java.util.LinkedList;
  14 +import java.util.List;
  15 +import java.util.concurrent.Executors;
  16 +import java.util.concurrent.ScheduledExecutorService;
  17 +import java.util.concurrent.ThreadFactory;
  18 +import java.util.concurrent.TimeUnit;
9 19  
10 20 /**
11   - * 从 socket client 到 .. 的缓冲
  21 + * 从 socket client 到调度系统 的缓冲
12 22 * Created by panzhao on 2017/5/4.
13 23 */
14 24 @Component
15   -public class GpsBeforeBuffer {
  25 +@Order(2)
  26 +public class GpsBeforeBuffer implements CommandLineRunner{
16 27  
17   - static LinkedList<GpsEntity> linkedList = new LinkedList<>();
  28 + static LinkedList<GpsEntity> linkedList = new LinkedList();
  29 + static final int MAX_SIZE = 4000 * 20;
18 30  
19   - public void put(BasicInfo basicInfo){
20   - //放弃补发数据
21   - byte cacheData = getCacheState(basicInfo.getServiceState());
22   - if(cacheData == 1)
23   - return;
24   - GpsEntity gps = new GpsEntity();
25   -
26   - gps.setDeviceId(basicInfo.getDeviceId());
27   - gps.setTimestamp(basicInfo.getTimestamp());
28   - gps.setLat(basicInfo.getLat());
29   - gps.setLon(basicInfo.getLon());
30   - gps.setDirection((float)basicInfo.getDirection() / 10);
31   - gps.setValid(basicInfo.getGpsValid());
32   - gps.setCompanyCode(basicInfo.getCompanyCode());
33   - gps.setStopNo(basicInfo.getStopNo());
34   - gps.setUpDown(basicInfo.getUpOrDown());
35   - gps.setSpeed((float)basicInfo.getSpeedGps() / 10);
36   - gps.setLineId(String.valueOf(basicInfo.getLineId()));
37   - gps.setState((int) getService(basicInfo.getServiceState()));
38   - //没有设备号
39   - if (StringUtils.isBlank(gps.getDeviceId()))
  31 + public void put(GpsEntity gps){
  32 + if(gps == null || StringUtils.isBlank(gps.getDeviceId()))
40 33 return;
41 34  
42   - linkedList.addLast(gps);
  35 + linkedList.add(gps);
43 36 }
44 37  
45   - public byte getCacheState(long serviceState) {
46   - return (byte)(((serviceState & 0x00100000) == 0x00100000) ? 1 : 0);
47   - }
  38 + static Logger log = LoggerFactory.getLogger(GpsBeforeBuffer.class);
48 39  
  40 + public static List<GpsEntity> pollAll(){
  41 + List<GpsEntity> rs = new ArrayList<>(300);
  42 + GpsEntity gps;
  43 + while (true){
  44 + gps = linkedList.poll();
  45 + if(gps == null){
  46 + break;
  47 + }
  48 + rs.add(gps);
  49 + }
  50 +
  51 + log.info("poll size: " + rs.size());
  52 + return rs;
  53 + }
49 54  
50 55 /**
51   - * 获取运营状态
52   - *
53   - * @return -1无效 0运营 1未运营
  56 + * 清理数据,保持最大 MAX_SIZE 个数的元素
54 57 */
55   - public static byte getService(long serviceState) {
56   - if ((serviceState & 0x00020000) == 0x00020000 || (serviceState & 0x80000000) == 0x80000000)
57   - return -1;
58   - return (byte) (((serviceState & 0x02000000) == 0x02000000) ? 1 : 0);
  58 + public static void clear(){
  59 + int size = linkedList.size();
  60 + if(size <= MAX_SIZE)
  61 + return;
  62 +
  63 + int len = size - MAX_SIZE;
  64 + for(int j = 0; j < len; j++){
  65 + linkedList.poll();
  66 + }
  67 + log.info("clear size: " + len);
  68 + }
  69 +
  70 + @Autowired
  71 + BufferSizeCheck bufferSizeCheck;
  72 +
  73 + ScheduledExecutorService sexec;
  74 +
  75 + @Override
  76 + public void run(String... strings) throws Exception {
  77 + sexec = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
  78 + @Override
  79 + public Thread newThread(Runnable r) {
  80 + Thread t = new Thread(r);
  81 + t.setName("gpsBufferSizeCheck");
  82 + return t;
  83 + }
  84 + });
  85 + sexec.scheduleWithFixedDelay(bufferSizeCheck, 60, 30, TimeUnit.SECONDS);
  86 + }
  87 +
  88 + @Component
  89 + public static class BufferSizeCheck extends Thread {
  90 +
  91 + @Override
  92 + public void run() {
  93 + GpsBeforeBuffer.clear();
  94 + }
59 95 }
60 96 }
61 97 \ No newline at end of file
... ...
src/main/java/com/bsth/client/pd/codec/MessageDecoder.java
... ... @@ -32,7 +32,7 @@ public class MessageDecoder extends CumulativeProtocolDecoder {
32 32 out.write(msg);
33 33  
34 34 //日志纪录
35   - log.info("pd client receive: " + ConvertUtil.bytesToHexString(bytes));
  35 + log.info("pd receive: " + ConvertUtil.bytesToHexString(bytes));
36 36 }catch (Exception e){
37 37 log.error("pd message decoder:", e);
38 38 }
... ...
src/main/java/com/bsth/client/pd/handler/PdClientHandler.java
... ... @@ -5,6 +5,7 @@ import com.bsth.client.msg.IMessageBody;
5 5 import com.bsth.client.pd.protocol.PdMessage;
6 6 import com.bsth.client.pd.protocol.Pd_41_0;
7 7 import com.bsth.client.pd.protocol.Pd_42_0;
  8 +import com.bsth.entity.GpsEntity;
8 9 import org.apache.mina.core.service.IoHandlerAdapter;
9 10 import org.apache.mina.core.session.IdleStatus;
10 11 import org.apache.mina.core.session.IoSession;
... ... @@ -67,20 +68,16 @@ public class PdClientHandler extends IoHandlerAdapter{
67 68 PdMessage msg = (PdMessage)message;
68 69 IMessageBody body = msg.getMessageBody();
69 70 if (body != null) {
70   - //String deviceId = body.getDeviceId();
71 71 if (0x31 == msg.getCommandType()) {
72   - //IoSession regSession = SessionManager.getInstance().getSession(deviceId);
73   - //if (regSession != null) regSession.closeNow();
74   - //SessionManager.getInstance().register(deviceId, session);
75 72 log.debug("设备编号:" + body.getDeviceId() + "建立连接");
76 73 }
77 74 else if(0x41 == msg.getCommandType()){
78 75 Pd_41_0 pd41 = (Pd_41_0)msg.getMessageBody();
79   - gpsBeforeBuffer.put(pd41.getInfo());
  76 + gpsBeforeBuffer.put(new GpsEntity(pd41.getInfo(), msg.getVersion(), 1));
80 77 }
81 78 else if(0x42 == msg.getCommandType()){
82 79 Pd_42_0 pd42 = (Pd_42_0)msg.getMessageBody();
83   - gpsBeforeBuffer.put(pd42.getInfo());
  80 + gpsBeforeBuffer.put(new GpsEntity(pd42.getInfo(), msg.getVersion(), 1));
84 81 }
85 82 }
86 83 }
... ...
src/main/java/com/bsth/client/pf/codec/MessageDecoder.java
... ... @@ -31,7 +31,7 @@ public class MessageDecoder extends CumulativeProtocolDecoder {
31 31 msg.read(bytes);
32 32  
33 33 //日志纪录
34   - log.info("pf client receive: " + ConvertUtil.bytesToHexString(bytes));
  34 + log.info("pf receive: " + ConvertUtil.bytesToHexString(bytes));
35 35 out.write(msg);
36 36 }catch (Exception e){
37 37 log.error("pf message decoder:", e);
... ... @@ -44,12 +44,4 @@ public class MessageDecoder extends CumulativeProtocolDecoder {
44 44 }
45 45 return false;
46 46 }
47   -
48   - /*private static String toHexString(byte[] bytes) {
49   - StringBuilder sb = new StringBuilder();
50   - for (byte b : bytes) {
51   - sb.append(Integer.toHexString(b & 0xff) + "|");
52   - }
53   - return sb.toString();
54   - }*/
55 47 }
... ...
src/main/java/com/bsth/client/pf/handler/PfClientHandler.java
... ... @@ -5,6 +5,7 @@ import com.bsth.client.common.Protocol2BizUtil;
5 5 import com.bsth.client.msg.IMessageBody;
6 6 import com.bsth.client.pd.protocol.BasicInfo;
7 7 import com.bsth.client.pf.protocol.PfMessage;
  8 +import com.bsth.entity.GpsEntity;
8 9 import org.apache.mina.core.service.IoHandlerAdapter;
9 10 import org.apache.mina.core.session.IdleStatus;
10 11 import org.apache.mina.core.session.IoSession;
... ... @@ -70,7 +71,7 @@ public class PfClientHandler extends IoHandlerAdapter{
70 71 }
71 72  
72 73 BasicInfo info = Protocol2BizUtil.getBasicInfoFromMsg(msg);
73   - gpsBeforeBuffer.put(info);
  74 + gpsBeforeBuffer.put(new GpsEntity(info, msg.getVersion(), 0));
74 75 }
75 76  
76 77 }
... ...
src/main/java/com/bsth/controller/GpsRealDataController.java 0 → 100644
  1 +package com.bsth.controller;
  2 +
  3 +import com.bsth.client.GpsBeforeBuffer;
  4 +import com.bsth.entity.GpsEntity;
  5 +import org.springframework.web.bind.annotation.RequestMapping;
  6 +import org.springframework.web.bind.annotation.RestController;
  7 +
  8 +import java.util.List;
  9 +
  10 +/**
  11 + * Created by panzhao on 2017/5/12.
  12 + */
  13 +@RestController
  14 +@RequestMapping("realGps")
  15 +public class GpsRealDataController {
  16 +
  17 +
  18 + @RequestMapping("/all")
  19 + public List<GpsEntity> all(){
  20 + return GpsBeforeBuffer.pollAll();
  21 + }
  22 +}
... ...
src/main/java/com/bsth/entity/GpsEntity.java
1 1 package com.bsth.entity;
2 2  
  3 +import com.bsth.client.pd.protocol.BasicInfo;
  4 +
3 5 /**
4 6 * @author PanZhao
5 7 * @ClassName: GpsRealData
... ... @@ -24,26 +26,6 @@ public class GpsEntity {
24 26 private String deviceId;
25 27  
26 28 /**
27   - * 停车场编码
28   - */
29   - private String carparkNo;
30   -
31   - /**
32   - * 站点编码
33   - */
34   - private String stopNo;
35   -
36   - /**
37   - * 站点名称
38   - */
39   - private String stationName;
40   -
41   - /**
42   - * 到站时间
43   - */
44   - private long arrTime;
45   -
46   - /**
47 29 * 经度
48 30 */
49 31 private Float lon;
... ... @@ -85,28 +67,55 @@ public class GpsEntity {
85 67 */
86 68 private int valid;
87 69  
88   - public String getDeviceId() {
89   - return deviceId;
90   - }
  70 + /**
  71 + * 数据来源
  72 + * 1:网关
  73 + * 0:转发
  74 + */
  75 + private int source;
91 76  
92   - public void setDeviceId(String deviceId) {
93   - this.deviceId = deviceId;
  77 + public GpsEntity(BasicInfo basicInfo, int version, int source) {
  78 + //放弃补发数据
  79 + byte cacheData = getCacheState(basicInfo.getServiceState());
  80 + if(cacheData == 1)
  81 + return;
  82 +
  83 + this.setDeviceId(basicInfo.getDeviceId());
  84 + this.setTimestamp(basicInfo.getTimestamp());
  85 + this.setLat(basicInfo.getLat());
  86 + this.setLon(basicInfo.getLon());
  87 + this.setDirection((float)basicInfo.getDirection() / 10);
  88 + this.setValid(basicInfo.getGpsValid());
  89 + this.setCompanyCode(basicInfo.getCompanyCode());
  90 + this.setUpDown(basicInfo.getUpOrDown());
  91 + this.setSpeed((float)basicInfo.getSpeedGps() / 10);
  92 + this.setLineId(String.valueOf(basicInfo.getLineId()));
  93 + this.setState((int) getService(basicInfo.getServiceState()));
  94 + this.setVersion(version);
  95 + this.setSource(source);
94 96 }
95 97  
96   - public String getCarparkNo() {
97   - return carparkNo;
  98 + public static byte getCacheState(long serviceState) {
  99 + return (byte)(((serviceState & 0x00100000) == 0x00100000) ? 1 : 0);
98 100 }
99 101  
100   - public void setCarparkNo(String carparkNo) {
101   - this.carparkNo = carparkNo;
  102 + /**
  103 + * 获取运营状态
  104 + *
  105 + * @return -1无效 0运营 1未运营
  106 + */
  107 + public static byte getService(long serviceState) {
  108 + if ((serviceState & 0x00020000) == 0x00020000 || (serviceState & 0x80000000) == 0x80000000)
  109 + return -1;
  110 + return (byte) (((serviceState & 0x02000000) == 0x02000000) ? 1 : 0);
102 111 }
103 112  
104   - public String getStopNo() {
105   - return stopNo;
  113 + public String getDeviceId() {
  114 + return deviceId;
106 115 }
107 116  
108   - public void setStopNo(String stopNo) {
109   - this.stopNo = stopNo;
  117 + public void setDeviceId(String deviceId) {
  118 + this.deviceId = deviceId;
110 119 }
111 120  
112 121 public Float getLon() {
... ... @@ -142,24 +151,6 @@ public class GpsEntity {
142 151 this.state = state;
143 152 }
144 153  
145   -
146   - public String getStationName() {
147   - return stationName;
148   - }
149   -
150   - public void setStationName(String stationName) {
151   - this.stationName = stationName;
152   - }
153   -
154   - public long getArrTime() {
155   - return arrTime;
156   - }
157   -
158   - public void setArrTime(long arrTime) {
159   - this.arrTime = arrTime;
160   - }
161   -
162   -
163 154 public String getLineId() {
164 155 return lineId;
165 156 }
... ... @@ -218,4 +209,12 @@ public class GpsEntity {
218 209 public void setSpeed(Float speed) {
219 210 this.speed = speed;
220 211 }
  212 +
  213 + public int getSource() {
  214 + return source;
  215 + }
  216 +
  217 + public void setSource(int source) {
  218 + this.source = source;
  219 + }
221 220 }
... ...
src/main/resources/logback.xml
... ... @@ -9,7 +9,29 @@
9 9  
10 10 <layout class="ch.qos.logback.classic.PatternLayout">
11 11 <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符 -->
12   - <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] [%file:%line] %-5level-%msg%n
  12 + <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} -%msg%n
  13 + </pattern>
  14 + </layout>
  15 + </appender>
  16 +
  17 + <!-- 浦东网关 -->
  18 + <appender name="FILE"
  19 + class="ch.qos.logback.core.rolling.RollingFileAppender">
  20 + <file>${LOG_BASE}/main.log</file>
  21 + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
  22 + <fileNamePattern>${LOG_BASE}/main-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
  23 + <timeBasedFileNamingAndTriggeringPolicy
  24 + class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
  25 + <maxFileSize>100MB</maxFileSize>
  26 + </timeBasedFileNamingAndTriggeringPolicy>
  27 + </rollingPolicy>
  28 + <encoder>
  29 + <pattern>%msg%n</pattern>
  30 + </encoder>
  31 +
  32 + <layout class="ch.qos.logback.classic.PatternLayout">
  33 + <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符 -->
  34 + <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} -%msg%n
13 35 </pattern>
14 36 </layout>
15 37 </appender>
... ... @@ -31,10 +53,15 @@
31 53  
32 54 <layout class="ch.qos.logback.classic.PatternLayout">
33 55 <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符 -->
34   - <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] [%file:%line] %-5level-%msg%n
  56 + <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} -%msg%n
35 57 </pattern>
36 58 </layout>
37 59 </appender>
  60 + <logger name="com.bsth.client.pd.codec.MessageDecoder"
  61 + level="INFO" additivity="false">
  62 + <appender-ref ref="PD_FILE" />
  63 + </logger>
  64 +
38 65  
39 66 <!-- 浦东转发 -->
40 67 <appender name="PF_FILE"
... ... @@ -53,10 +80,14 @@
53 80  
54 81 <layout class="ch.qos.logback.classic.PatternLayout">
55 82 <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符 -->
56   - <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] [%file:%line] %-5level-%msg%n
  83 + <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level-%msg%n
57 84 </pattern>
58 85 </layout>
59 86 </appender>
  87 + <logger name="com.bsth.client.pf.codec.MessageDecoder"
  88 + level="INFO" additivity="false">
  89 + <appender-ref ref="PF_FILE" />
  90 + </logger>
60 91  
61 92  
62 93 <!-- 日志输出级别 -->
... ...