Commit a6bee16bf786fc34bb4c043c5e24244b897fefbc

Authored by 潘钊
1 parent 460b9760

update...

src/main/java/com/bsth/controller/realcontrol/anomalyCheckController.java
... ... @@ -56,4 +56,14 @@ public class anomalyCheckController {
56 56 ClientApp.pdreconn();
57 57 ClientApp.pfreconn();
58 58 }
  59 +
  60 + @RequestMapping(value = "/pdClose", method = RequestMethod.POST)
  61 + public void pdClose(){
  62 + ClientApp.pdClose();
  63 + }
  64 +
  65 + @RequestMapping(value = "/pfClose", method = RequestMethod.POST)
  66 + public void pfClose(){
  67 + ClientApp.pfClose();
  68 + }
59 69 }
... ...
src/main/java/com/bsth/data/car_out_info/CarOutInfoHandler.java
... ... @@ -102,7 +102,7 @@ public class CarOutInfoHandler implements CommandLineRunner, CarOutInfo {
102 102 ps.setString(2, sch.getXlBm());
103 103 ps.setString(3, sch.getXlName());
104 104 ps.setString(4, sch.getLpName());
105   - ps.setInt(5, sch.getFcno());
  105 + ps.setInt(5, sch.getFcno()==null?-1:sch.getFcno());
106 106 ps.setString(6, sch.getDfsj());
107 107 ps.setString(7, sch.getClZbh());
108 108 ps.setString(8, BasicData.nbbmCompanyPlateMap.get(sch.getClZbh()));
... ...
src/main/java/com/bsth/data/gpsdata/arrival/GpsRealAnalyse.java
... ... @@ -49,6 +49,8 @@ public class GpsRealAnalyse {
49 49 //如果正在恢复数据
50 50 if (GpsDataRecovery.run)
51 51 return;
  52 +
  53 + long t = System.currentTimeMillis();
52 54 logger.info("analyse gps size: " + list.size());
53 55 //按车辆分组gps
54 56 ArrayListMultimap multimap = ArrayListMultimap.create();
... ... @@ -70,6 +72,8 @@ public class GpsRealAnalyse {
70 72 //加入实时gps对照
71 73 for(GpsEntity gps: list)
72 74 gpsRealData.put(gps);
  75 +
  76 + logger.info("time , " + (System.currentTimeMillis() - t));
73 77 } catch (InterruptedException e) {
74 78 logger.error("", e);
75 79 }
... ...
src/main/java/com/bsth/data/gpsdata/client/ClientApp.java
... ... @@ -21,10 +21,7 @@ import org.springframework.beans.factory.annotation.Autowired;
21 21 import org.springframework.stereotype.Component;
22 22  
23 23 import java.net.InetSocketAddress;
24   -import java.util.concurrent.Executors;
25   -import java.util.concurrent.ScheduledExecutorService;
26   -import java.util.concurrent.ThreadFactory;
27   -import java.util.concurrent.TimeUnit;
  24 +import java.util.concurrent.*;
28 25  
29 26 /**
30 27 * Created by panzhao on 2017/5/4.
... ... @@ -43,6 +40,7 @@ public class ClientApp {
43 40 GpsBeforeBuffer gpsBeforeBuffer;
44 41  
45 42 static Logger logger = LoggerFactory.getLogger(ClientApp.class);
  43 + private static ExecutorService exec;
46 44  
47 45 private ScheduledExecutorService sexec = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
48 46  
... ... @@ -85,12 +83,52 @@ public class ClientApp {
85 83 return flag;
86 84 }
87 85  
  86 + public static void pdconnect(final String device) {
  87 + exec.submit(new Runnable() {
  88 +
  89 + @Override
  90 + public void run() {
  91 + // TODO Auto-generated method stub
  92 + long now = System.currentTimeMillis();
  93 + boolean flag = false;
  94 + while (!flag) {
  95 + flag = dconnect(device);
  96 + }
  97 + System.out.println("设备编号:" + device + "重连, cost time: " + (System.currentTimeMillis() - now));
  98 + }
  99 + });
  100 + }
  101 +
  102 + public static void pfconnect(final String device) {
  103 + exec.submit(new Runnable() {
  104 +
  105 + @Override
  106 + public void run() {
  107 + // TODO Auto-generated method stub
  108 + long now = System.currentTimeMillis();
  109 + boolean flag = false;
  110 + while (!flag) {
  111 + flag = fconnect(device);
  112 + }
  113 + System.out.println("重连, cost time: " + (System.currentTimeMillis() - now));
  114 + }
  115 + });
  116 + }
  117 +
88 118 public static void pdreconn(){
89   - dconnect(ConfigUtil.get("forward.device.name"));
  119 + pdconnect(ConfigUtil.get("forward.device.name"));
90 120 }
91 121  
92 122 public static void pfreconn(){
93   - fconnect(ConfigUtil.get("forward.device.name"));
  123 + pfconnect(ConfigUtil.get("forward.device.name"));
  124 + }
  125 +
  126 + public static void pdClose(){
  127 + pdSession.closeNow();
  128 + }
  129 +
  130 + public static void pfClose(){
  131 + pfSession.closeNow();
94 132 }
95 133  
96 134 public static boolean fconnect(String device) {
... ... @@ -111,7 +149,7 @@ public class ClientApp {
111 149 }
112 150  
113 151 public void init() {
114   - //exec = Executors.newFixedThreadPool(4);
  152 + exec = Executors.newFixedThreadPool(50);
115 153 sexec.scheduleAtFixedRate(new SessionChecker(), 1, 1, TimeUnit.MINUTES);
116 154 /*******************************浦东********************************/
117 155 pdDataConnector = new NioSocketConnector();
... ... @@ -133,7 +171,7 @@ public class ClientApp {
133 171  
134 172 pdDataConnector.setHandler(pdClient);
135 173  
136   - dconnect(ConfigUtil.get("forward.device.name"));
  174 + pdconnect(ConfigUtil.get("forward.device.name"));
137 175 /*******************************浦东转发********************************/
138 176 pfDataConnector = new NioSocketConnector();
139 177  
... ... @@ -153,7 +191,7 @@ public class ClientApp {
153 191 config1.setIdleTime(IdleStatus.BOTH_IDLE, 60);
154 192  
155 193 pfDataConnector.setHandler(pfClient);
156   - fconnect(ConfigUtil.get("forward.device.name"));
  194 + pfconnect(ConfigUtil.get("forward.device.name"));
157 195  
158 196  
159 197 gpsBeforeBuffer.init();
... ... @@ -182,6 +220,5 @@ public class ClientApp {
182 220 logger.error("SessionChecker异常", e);
183 221 }
184 222 }
185   -
186 223 }
187 224 }
188 225 \ No newline at end of file
... ...
src/main/java/com/bsth/data/gpsdata/client/DataMonitor.java deleted 100644 → 0
1   -package com.bsth.data.gpsdata.client;
2   -
3   -/**
4   - * Created by panzhao on 2017/5/7.
5   - */
6   -public class DataMonitor {
7   -
8   - public static long lastTimePd;
9   -
10   - public static long lastTimePf;
11   -
12   -}
src/main/java/com/bsth/data/gpsdata/client/GpsBeforeBuffer.java
... ... @@ -63,7 +63,7 @@ public class GpsBeforeBuffer {
63 63 }
64 64  
65 65 public void init(){
66   - Application.mainServices.scheduleWithFixedDelay(gpsHandleThread, 20 * 1000, 1100, TimeUnit.MILLISECONDS);
  66 + Application.mainServices.scheduleWithFixedDelay(gpsHandleThread, 20 * 1000, 1200, TimeUnit.MILLISECONDS);
67 67 }
68 68  
69 69 @Component
... ... @@ -79,7 +79,7 @@ public class GpsBeforeBuffer {
79 79 list = new ArrayList<>(100);
80 80  
81 81 GpsEntity gps;
82   - for(int i = 0; i < 2000; i ++){
  82 + for(int i = 0; i < 4000; i ++){
83 83 gps = linkedList.poll();
84 84 if(gps == null)
85 85 break;
... ...
src/main/java/com/bsth/data/gpsdata/client/pd/codec/MessageDecoder.java
1 1 package com.bsth.data.gpsdata.client.pd.codec;
2 2  
  3 +import com.bsth.data.gpsdata.client.pd.common.ConvertUtil;
3 4 import com.bsth.data.gpsdata.client.pd.protocol.PdMessage;
4 5 import org.apache.mina.core.buffer.IoBuffer;
5 6 import org.apache.mina.core.session.IoSession;
6 7 import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
7 8 import org.apache.mina.filter.codec.ProtocolDecoderOutput;
  9 +import org.slf4j.Logger;
  10 +import org.slf4j.LoggerFactory;
8 11  
9 12  
10 13 public class MessageDecoder extends CumulativeProtocolDecoder {
11 14  
  15 + Logger log = LoggerFactory.getLogger(MessageDecoder.class);
  16 +
12 17 @Override
13 18 protected boolean doDecode(IoSession session, IoBuffer in,
14 19 ProtocolDecoderOutput out) throws Exception {
... ... @@ -19,15 +24,18 @@ public class MessageDecoder extends CumulativeProtocolDecoder {
19 24 int len = ((lenh & 0xff) << 8) + (lenl & 0xff);
20 25 if ((head1 & 0xff) == 0xfa && (head2 & 0xff) == 0xf5) {
21 26 if (in.remaining() > len) {
22   - byte[] bytes = new byte[len + 1];
23   - in.get(bytes);
24   - PdMessage msg = new PdMessage();
25   - msg.read(bytes);
26   - out.write(msg);
27   - /*if ("true".equals(ConfigUtil.getProperty("protocolup", "true"))) {
28   - UpProtocolDataService.getInstance().write(new byte[]{ head1, head2, lenh, lenl });
29   - UpProtocolDataService.getInstance().write(bytes);
30   - }*/
  27 + try {
  28 + byte[] bytes = new byte[len + 1];
  29 + in.get(bytes);
  30 + PdMessage msg = new PdMessage();
  31 + msg.read(bytes);
  32 + out.write(msg);
  33 +
  34 + //日志纪录
  35 + log.info("pd client receive: " + ConvertUtil.bytesToHexString(bytes));
  36 + }catch (Exception e){
  37 + log.error("pd message decoder:", e);
  38 + }
31 39 } else {
32 40 in.reset();
33 41 return false;
... ...
src/main/java/com/bsth/data/gpsdata/client/pd/common/ConvertUtil.java
... ... @@ -126,4 +126,24 @@ public class ConvertUtil {
126 126  
127 127 return val;
128 128 }
  129 +
  130 + /* Convert byte[] to hex string.这里我们可以将byte转换成int,然后利用Integer.toHexString(int)来转换成16进制字符串。
  131 + * @param src byte[] data
  132 +* @return hex string
  133 +*/
  134 + public static String bytesToHexString(byte[] src){
  135 + StringBuilder stringBuilder = new StringBuilder("");
  136 + if (src == null || src.length <= 0) {
  137 + return null;
  138 + }
  139 + for (int i = 0; i < src.length; i++) {
  140 + int v = src[i] & 0xFF;
  141 + String hv = Integer.toHexString(v);
  142 + if (hv.length() < 2) {
  143 + stringBuilder.append(0);
  144 + }
  145 + stringBuilder.append(hv);
  146 + }
  147 + return stringBuilder.toString();
  148 + }
129 149 }
... ...
src/main/java/com/bsth/data/gpsdata/client/pd/handler/PdClientHandler.java
... ... @@ -52,6 +52,7 @@ public class PdClientHandler extends IoHandlerAdapter{
52 52  
53 53 @Override
54 54 public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
  55 + log.warn("pd sessionIdle");
55 56 session.closeNow();
56 57 }
57 58  
... ...
src/main/java/com/bsth/data/gpsdata/client/pf/codec/MessageDecoder.java
1 1 package com.bsth.data.gpsdata.client.pf.codec;
2 2  
  3 +import com.bsth.data.gpsdata.client.pd.common.ConvertUtil;
3 4 import com.bsth.data.gpsdata.client.pf.protocol.PfMessage;
4 5 import org.apache.mina.core.buffer.IoBuffer;
5 6 import org.apache.mina.core.session.IoSession;
6 7 import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
7 8 import org.apache.mina.filter.codec.ProtocolDecoderOutput;
  9 +import org.slf4j.Logger;
  10 +import org.slf4j.LoggerFactory;
8 11  
9 12  
10 13 public class MessageDecoder extends CumulativeProtocolDecoder {
11 14  
  15 + Logger log = LoggerFactory.getLogger(MessageDecoder.class);
  16 +
12 17 @Override
13 18 protected boolean doDecode(IoSession session, IoBuffer in,
14 19 ProtocolDecoderOutput out) throws Exception {
... ... @@ -19,11 +24,18 @@ public class MessageDecoder extends CumulativeProtocolDecoder {
19 24 int len = ((lenh & 0xff) << 8) + (lenl & 0xff);
20 25 if ((head1 & 0xff) == 0xfa && (head2 & 0xff) == 0xf5) {
21 26 if (in.remaining() > len) {
22   - byte[] bytes = new byte[len + 1];
23   - in.get(bytes);
24   - PfMessage msg = new PfMessage();
25   - msg.read(bytes);
26   - out.write(msg);
  27 + try {
  28 + byte[] bytes = new byte[len + 1];
  29 + in.get(bytes);
  30 + PfMessage msg = new PfMessage();
  31 + msg.read(bytes);
  32 +
  33 + //日志纪录
  34 + log.info("pf client receive: " + ConvertUtil.bytesToHexString(bytes));
  35 + out.write(msg);
  36 + }catch (Exception e){
  37 + log.error("pf message decoder:", e);
  38 + }
27 39 } else {
28 40 in.reset();
29 41 return false;
... ...
src/main/java/com/bsth/data/gpsdata/client/pf/handler/PfClientHandler.java
... ... @@ -49,6 +49,7 @@ public class PfClientHandler extends IoHandlerAdapter{
49 49  
50 50 @Override
51 51 public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
  52 + log.warn("pf sessionIdle");
52 53 session.closeNow();
53 54 }
54 55  
... ...